[jira] [Commented] (SPARK-24432) Add support for dynamic resource allocation
[ https://issues.apache.org/jira/browse/SPARK-24432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17120813#comment-17120813 ] zhoukang commented on SPARK-24432: -- looking forward for this feature come true +1 by the way, [~dongjoon][~liyinan926]how can we join the sig-big-data conference now, since the url https://github.com/kubernetes/community/tree/master/sig-big-data is invalid > Add support for dynamic resource allocation > --- > > Key: SPARK-24432 > URL: https://issues.apache.org/jira/browse/SPARK-24432 > Project: Spark > Issue Type: New Feature > Components: Kubernetes, Spark Core >Affects Versions: 3.1.0 >Reporter: Yinan Li >Priority: Major > > This is an umbrella ticket for work on adding support for dynamic resource > allocation into the Kubernetes mode. This requires a Kubernetes-specific > external shuffle service. The feature is available in our fork at > github.com/apache-spark-on-k8s/spark. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-29564) Cluster deploy mode should support Spark Thrift server
[ https://issues.apache.org/jira/browse/SPARK-29564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17028327#comment-17028327 ] zhoukang commented on SPARK-29564: -- Any progress? [~cltlfcjin] We also support yarn cluster mode in our production enviroment which use zk for router > Cluster deploy mode should support Spark Thrift server > -- > > Key: SPARK-29564 > URL: https://issues.apache.org/jira/browse/SPARK-29564 > Project: Spark > Issue Type: Bug > Components: Spark Submit, SQL >Affects Versions: 2.4.4, 3.0.0 >Reporter: Lantao Jin >Priority: Major > > Cluster deploy mode is not applicable to Spark Thrift server now. This > restriction is too rude. > In our production, we use multiple Spark Thrift servers as long running > services which are used yarn-cluster mode to launch. The life cycle of STS is > managed by upper layer manager system which is also used to dispatcher user's > JDBC connection to applicable STS. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-29880) Handle submit exception when target hadoop cluster is Federation
zhoukang created SPARK-29880: Summary: Handle submit exception when target hadoop cluster is Federation Key: SPARK-29880 URL: https://issues.apache.org/jira/browse/SPARK-29880 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.4.4 Reporter: zhoukang When we submit application to federation yarn cluster. Since getYarnClusterMetrics is not implemented. The submission will exit with failure. {code:java} def submitApplication(): ApplicationId = { ResourceRequestHelper.validateResources(sparkConf) var appId: ApplicationId = null try { launcherBackend.connect() yarnClient.init(hadoopConf) yarnClient.start() logInfo("Requesting a new application from cluster with %d NodeManagers" .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers)) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin
[ https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16928204#comment-16928204 ] zhoukang commented on SPARK-21492: -- Any progress of this issue? [~jiangxb1987] We also encountered this problem > Memory leak in SortMergeJoin > > > Key: SPARK-21492 > URL: https://issues.apache.org/jira/browse/SPARK-21492 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0 >Reporter: Zhan Zhang >Priority: Major > > In SortMergeJoin, if the iterator is not exhausted, there will be memory leak > caused by the Sort. The memory is not released until the task end, and cannot > be used by other operators causing performance drop or OOM. -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27068) Support failed jobs ui and completed jobs ui use different queue
[ https://issues.apache.org/jira/browse/SPARK-27068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16861780#comment-16861780 ] zhoukang commented on SPARK-27068: -- Sorry [~dongjoon] i am not familiar with Hive.But i think this is useful for some use case.Maybe we cant add a switch-off configuration for this? > Support failed jobs ui and completed jobs ui use different queue > > > Key: SPARK-27068 > URL: https://issues.apache.org/jira/browse/SPARK-27068 > Project: Spark > Issue Type: Improvement > Components: Web UI >Affects Versions: 3.0.0 >Reporter: zhoukang >Priority: Major > Attachments: 屏幕快照 2019-06-06 下午1.12.04.png > > > For some long running jobs,we may want to check out the cause of some failed > jobs. > But most jobs has completed and failed jobs ui may disappear, we can use > different queue for this two kinds of jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27827) File does not exist notice is misleading in FileScanRDD
[ https://issues.apache.org/jira/browse/SPARK-27827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16857297#comment-16857297 ] zhoukang commented on SPARK-27827: -- I just test this in 2.3 cluster [~dongjoon] > File does not exist notice is misleading in FileScanRDD > --- > > Key: SPARK-27827 > URL: https://issues.apache.org/jira/browse/SPARK-27827 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.2 >Reporter: zhoukang >Priority: Minor > > When we encounter error below, we will try "refresh table" and will think the > error will not thrown again. > {code:java} > Error: java.lang.IllegalStateException: Can't overwrite cause with > java.io.FileNotFoundException: File does not exist: > /user/s_xdata/kuduhive_warehouse/info_dev/dws_quality_time_dictionary/part-3-92c84bf9-99c0-49d9-8cdf-78b1844d75c3.snappy.parquet > It is possible the underlying files have been updated. You can explicitly > invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in > SQL or by recreating the Dataset/DataFrame involved. (state=,code=0) > {code} > The cause is 'InMemoryFileIndex' will be cached in 'HiveMetaStoreCatalog'.And > refresh command will only invalidate table of current session.The notice is > misleading when we have a long-running thriftserver. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27827) File does not exist notice is misleading in FileScanRDD
[ https://issues.apache.org/jira/browse/SPARK-27827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-27827: - Affects Version/s: (was: 2.4.3) > File does not exist notice is misleading in FileScanRDD > --- > > Key: SPARK-27827 > URL: https://issues.apache.org/jira/browse/SPARK-27827 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.2 >Reporter: zhoukang >Priority: Minor > > When we encounter error below, we will try "refresh table" and will think the > error will not thrown again. > {code:java} > Error: java.lang.IllegalStateException: Can't overwrite cause with > java.io.FileNotFoundException: File does not exist: > /user/s_xdata/kuduhive_warehouse/info_dev/dws_quality_time_dictionary/part-3-92c84bf9-99c0-49d9-8cdf-78b1844d75c3.snappy.parquet > It is possible the underlying files have been updated. You can explicitly > invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in > SQL or by recreating the Dataset/DataFrame involved. (state=,code=0) > {code} > The cause is 'InMemoryFileIndex' will be cached in 'HiveMetaStoreCatalog'.And > refresh command will only invalidate table of current session.The notice is > misleading when we have a long-running thriftserver. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27068) Support failed jobs ui and completed jobs ui use different queue
[ https://issues.apache.org/jira/browse/SPARK-27068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16857291#comment-16857291 ] zhoukang commented on SPARK-27068: -- [~srowen] Here is a use case in our cluster. We have a long running spark sql thriftserver, and users use that as ad-hoc query engine and also for a online bi service. Since failure number is not too large. But total query number will quickly increased as show in image below. When we want to find the root cause for the failed query, currently is really not too convenience. !屏幕快照 2019-06-06 下午1.12.04.png! > Support failed jobs ui and completed jobs ui use different queue > > > Key: SPARK-27068 > URL: https://issues.apache.org/jira/browse/SPARK-27068 > Project: Spark > Issue Type: Improvement > Components: Web UI >Affects Versions: 2.4.0 >Reporter: zhoukang >Priority: Major > Attachments: 屏幕快照 2019-06-06 下午1.12.04.png > > > For some long running jobs,we may want to check out the cause of some failed > jobs. > But most jobs has completed and failed jobs ui may disappear, we can use > different queue for this two kinds of jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27068) Support failed jobs ui and completed jobs ui use different queue
[ https://issues.apache.org/jira/browse/SPARK-27068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-27068: - Attachment: 屏幕快照 2019-06-06 下午1.12.04.png > Support failed jobs ui and completed jobs ui use different queue > > > Key: SPARK-27068 > URL: https://issues.apache.org/jira/browse/SPARK-27068 > Project: Spark > Issue Type: Improvement > Components: Web UI >Affects Versions: 2.4.0 >Reporter: zhoukang >Priority: Major > Attachments: 屏幕快照 2019-06-06 下午1.12.04.png > > > For some long running jobs,we may want to check out the cause of some failed > jobs. > But most jobs has completed and failed jobs ui may disappear, we can use > different queue for this two kinds of jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27827) File does not exist notice is misleading in FileScanRDD
zhoukang created SPARK-27827: Summary: File does not exist notice is misleading in FileScanRDD Key: SPARK-27827 URL: https://issues.apache.org/jira/browse/SPARK-27827 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.3, 2.3.2 Reporter: zhoukang When we encounter error below, we will try "refresh table" and will think the error will not thrown again. {code:java} Error: java.lang.IllegalStateException: Can't overwrite cause with java.io.FileNotFoundException: File does not exist: /user/s_xdata/kuduhive_warehouse/info_dev/dws_quality_time_dictionary/part-3-92c84bf9-99c0-49d9-8cdf-78b1844d75c3.snappy.parquet It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved. (state=,code=0) {code} The cause is 'InMemoryFileIndex' will be cached in 'HiveMetaStoreCatalog'.And refresh command will only invalidate table of current session.The notice is misleading when we have a long-running thriftserver. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25299) Use remote storage for persisting shuffle data
[ https://issues.apache.org/jira/browse/SPARK-25299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16823613#comment-16823613 ] zhoukang commented on SPARK-25299: -- nice work! Really looking forward thanks [~yifeih] > Use remote storage for persisting shuffle data > -- > > Key: SPARK-25299 > URL: https://issues.apache.org/jira/browse/SPARK-25299 > Project: Spark > Issue Type: New Feature > Components: Shuffle >Affects Versions: 2.4.0 >Reporter: Matt Cheah >Priority: Major > > In Spark, the shuffle primitive requires Spark executors to persist data to > the local disk of the worker nodes. If executors crash, the external shuffle > service can continue to serve the shuffle data that was written beyond the > lifetime of the executor itself. In YARN, Mesos, and Standalone mode, the > external shuffle service is deployed on every worker node. The shuffle > service shares local disk with the executors that run on its node. > There are some shortcomings with the way shuffle is fundamentally implemented > right now. Particularly: > * If any external shuffle service process or node becomes unavailable, all > applications that had an executor that ran on that node must recompute the > shuffle blocks that were lost. > * Similarly to the above, the external shuffle service must be kept running > at all times, which may waste resources when no applications are using that > shuffle service node. > * Mounting local storage can prevent users from taking advantage of > desirable isolation benefits from using containerized environments, like > Kubernetes. We had an external shuffle service implementation in an early > prototype of the Kubernetes backend, but it was rejected due to its strict > requirement to be able to mount hostPath volumes or other persistent volume > setups. > In the following [architecture discussion > document|https://docs.google.com/document/d/1uCkzGGVG17oGC6BJ75TpzLAZNorvrAU3FRd2X-rVHSM/edit#heading=h.btqugnmt2h40] > (note: _not_ an SPIP), we brainstorm various high level architectures for > improving the external shuffle service in a way that addresses the above > problems. The purpose of this umbrella JIRA is to promote additional > discussion on how we can approach these problems, both at the architecture > level and the implementation level. We anticipate filing sub-issues that > break down the tasks that must be completed to achieve this goal. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25299) Use remote storage for persisting shuffle data
[ https://issues.apache.org/jira/browse/SPARK-25299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16822997#comment-16822997 ] zhoukang commented on SPARK-25299: -- is there any progress of this task? [~yifeih] [~mcheah] > Use remote storage for persisting shuffle data > -- > > Key: SPARK-25299 > URL: https://issues.apache.org/jira/browse/SPARK-25299 > Project: Spark > Issue Type: New Feature > Components: Shuffle >Affects Versions: 2.4.0 >Reporter: Matt Cheah >Priority: Major > > In Spark, the shuffle primitive requires Spark executors to persist data to > the local disk of the worker nodes. If executors crash, the external shuffle > service can continue to serve the shuffle data that was written beyond the > lifetime of the executor itself. In YARN, Mesos, and Standalone mode, the > external shuffle service is deployed on every worker node. The shuffle > service shares local disk with the executors that run on its node. > There are some shortcomings with the way shuffle is fundamentally implemented > right now. Particularly: > * If any external shuffle service process or node becomes unavailable, all > applications that had an executor that ran on that node must recompute the > shuffle blocks that were lost. > * Similarly to the above, the external shuffle service must be kept running > at all times, which may waste resources when no applications are using that > shuffle service node. > * Mounting local storage can prevent users from taking advantage of > desirable isolation benefits from using containerized environments, like > Kubernetes. We had an external shuffle service implementation in an early > prototype of the Kubernetes backend, but it was rejected due to its strict > requirement to be able to mount hostPath volumes or other persistent volume > setups. > In the following [architecture discussion > document|https://docs.google.com/document/d/1uCkzGGVG17oGC6BJ75TpzLAZNorvrAU3FRd2X-rVHSM/edit#heading=h.btqugnmt2h40] > (note: _not_ an SPIP), we brainstorm various high level architectures for > improving the external shuffle service in a way that addresses the above > problems. The purpose of this umbrella JIRA is to promote additional > discussion on how we can approach these problems, both at the architecture > level and the implementation level. We anticipate filing sub-issues that > break down the tasks that must be completed to achieve this goal. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26568) Too many partitions may cause thriftServer frequently Full GC
[ https://issues.apache.org/jira/browse/SPARK-26568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815016#comment-16815016 ] zhoukang commented on SPARK-26568: -- It is hive client we used in spark cause this problem [~srowen] > Too many partitions may cause thriftServer frequently Full GC > - > > Key: SPARK-26568 > URL: https://issues.apache.org/jira/browse/SPARK-26568 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: zhoukang >Priority: Major > > The reason is that: > first we have a table with many partitions(may be several hundred);second, we > have some concurrent queries.Then the long-running thriftServer may encounter > OOM issue. > Here is a case: > call stack of OOM thread: > {code:java} > pool-34-thread-10 > at > org.apache.hadoop.hive.metastore.api.StorageDescriptor.(Lorg/apache/hadoop/hive/metastore/api/StorageDescriptor;)V > (StorageDescriptor.java:240) > at > org.apache.hadoop.hive.metastore.api.Partition.(Lorg/apache/hadoop/hive/metastore/api/Partition;)V > (Partition.java:216) > at > org.apache.hadoop.hive.metastore.HiveMetaStoreClient.deepCopy(Lorg/apache/hadoop/hive/metastore/api/Partition;)Lorg/apache/hadoop/hive/metastore/api/Partition; > (HiveMetaStoreClient.java:1343) > at > org.apache.hadoop.hive.metastore.HiveMetaStoreClient.deepCopyPartitions(Ljava/util/Collection;Ljava/util/List;)Ljava/util/List; > (HiveMetaStoreClient.java:1409) > at > org.apache.hadoop.hive.metastore.HiveMetaStoreClient.deepCopyPartitions(Ljava/util/List;)Ljava/util/List; > (HiveMetaStoreClient.java:1397) > at > org.apache.hadoop.hive.metastore.HiveMetaStoreClient.listPartitionsByFilter(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;S)Ljava/util/List; > (HiveMetaStoreClient.java:914) > at > sun.reflect.GeneratedMethodAccessor98.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object; > (Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object; > (DelegatingMethodAccessorImpl.java:43) > at > java.lang.reflect.Method.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object; > (Method.java:606) > at > org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(Ljava/lang/Object;Ljava/lang/reflect/Method;[Ljava/lang/Object;)Ljava/lang/Object; > (RetryingMetaStoreClient.java:90) > at > com.sun.proxy.$Proxy30.listPartitionsByFilter(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;S)Ljava/util/List; > (Unknown Source) > at > org.apache.hadoop.hive.ql.metadata.Hive.getPartitionsByFilter(Lorg/apache/hadoop/hive/ql/metadata/Table;Ljava/lang/String;)Ljava/util/List; > (Hive.java:1967) > at > sun.reflect.GeneratedMethodAccessor97.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object; > (Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object; > (DelegatingMethodAccessorImpl.java:43) > at > java.lang.reflect.Method.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object; > (Method.java:606) > at > org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(Lorg/apache/hadoop/hive/ql/metadata/Hive;Lorg/apache/hadoop/hive/ql/metadata/Table;Lscala/collection/Seq;)Lscala/collection/Seq; > (HiveShim.scala:602) > at > org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitionsByFilter$1.apply()Lscala/collection/Seq; > (HiveClientImpl.scala:608) > at > org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitionsByFilter$1.apply()Ljava/lang/Object; > (HiveClientImpl.scala:606) > at > org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply()Ljava/lang/Object; > (HiveClientImpl.scala:321) > at > org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(Lscala/Function0;Lscala/runtime/IntRef;Lscala/runtime/ObjectRef;Ljava/lang/Object;)V > (HiveClientImpl.scala:264) > at > org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(Lscala/Function0;)Ljava/lang/Object; > (HiveClientImpl.scala:263) > at > org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(Lscala/Function0;)Ljava/lang/Object; > (HiveClientImpl.scala:307) > at > org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionsByFilter(Lorg/apache/spark/sql/catalyst/catalog/CatalogTable;Lscala/collection/Seq;)Lscala/collection/Seq; > (HiveClientImpl.scala:606) > at > org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$listPartitionsByFilter$1.apply()Lscala/collection/Seq; > (HiveExternalCatalog.scala:1017) > at > org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$listPartitionsByFilter$1.apply()Ljava/lang/Object; > (HiveExternalCatalog.scala:1000) > at >
[jira] [Commented] (SPARK-26703) Hive record writer will always depends on parquet-1.6 writer should fix it
[ https://issues.apache.org/jira/browse/SPARK-26703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815017#comment-16815017 ] zhoukang commented on SPARK-26703: -- I can make a pr for this [~hyukjin.kwon] > Hive record writer will always depends on parquet-1.6 writer should fix it > --- > > Key: SPARK-26703 > URL: https://issues.apache.org/jira/browse/SPARK-26703 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.2, 2.4.0 >Reporter: zhoukang >Priority: Major > > Currently, when we are using insert into hive table related command. > The parquet file generated will always be version 1.6,reason is below: > 1. we rely on hive-exec HiveFileFormatUtils to get recordWriter > {code:java} > private val hiveWriter = HiveFileFormatUtils.getHiveRecordWriter( > jobConf, > tableDesc, > serializer.getSerializedClass, > fileSinkConf, > new Path(path), > Reporter.NULL) > {code} > 2. we will call > {code:java} > public static RecordWriter getHiveRecordWriter(JobConf jc, > TableDesc tableInfo, Class outputClass, > FileSinkDesc conf, Path outPath, Reporter reporter) throws > HiveException { > HiveOutputFormat hiveOutputFormat = getHiveOutputFormat(jc, > tableInfo); > try { > boolean isCompressed = conf.getCompressed(); > JobConf jc_output = jc; > if (isCompressed) { > jc_output = new JobConf(jc); > String codecStr = conf.getCompressCodec(); > if (codecStr != null && !codecStr.trim().equals("")) { > Class codec = > (Class) > JavaUtils.loadClass(codecStr); > FileOutputFormat.setOutputCompressorClass(jc_output, codec); > } > String type = conf.getCompressType(); > if (type != null && !type.trim().equals("")) { > CompressionType style = CompressionType.valueOf(type); > SequenceFileOutputFormat.setOutputCompressionType(jc, style); > } > } > return getRecordWriter(jc_output, hiveOutputFormat, outputClass, > isCompressed, tableInfo.getProperties(), outPath, reporter); > } catch (Exception e) { > throw new HiveException(e); > } > } > public static RecordWriter getRecordWriter(JobConf jc, > OutputFormat outputFormat, > Class valueClass, boolean isCompressed, > Properties tableProp, Path outPath, Reporter reporter > ) throws IOException, HiveException { > if (!(outputFormat instanceof HiveOutputFormat)) { > outputFormat = new HivePassThroughOutputFormat(outputFormat); > } > return ((HiveOutputFormat)outputFormat).getHiveRecordWriter( > jc, outPath, valueClass, isCompressed, tableProp, reporter); > } > {code} > 3. then in MapredParquetOutPutFormat > {code:java} > public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter > getHiveRecordWriter( > final JobConf jobConf, > final Path finalOutPath, > final Class valueClass, > final boolean isCompressed, > final Properties tableProperties, > final Progressable progress) throws IOException { > LOG.info("creating new record writer..." + this); > final String columnNameProperty = > tableProperties.getProperty(IOConstants.COLUMNS); > final String columnTypeProperty = > tableProperties.getProperty(IOConstants.COLUMNS_TYPES); > List columnNames; > List columnTypes; > if (columnNameProperty.length() == 0) { > columnNames = new ArrayList(); > } else { > columnNames = Arrays.asList(columnNameProperty.split(",")); > } > if (columnTypeProperty.length() == 0) { > columnTypes = new ArrayList(); > } else { > columnTypes = > TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); > } > > DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, > columnTypes), jobConf); > return getParquerRecordWriterWrapper(realOutputFormat, jobConf, > finalOutPath.toString(), > progress,tableProperties); > } > {code} > 4. then call > {code:java} > public ParquetRecordWriterWrapper( > final OutputFormat realOutputFormat, > final JobConf jobConf, > final String name, > final Progressable progress, Properties tableProperties) throws > IOException { > try { > // create a TaskInputOutputContext > TaskAttemptID taskAttemptID = > TaskAttemptID.forName(jobConf.get("mapred.task.id")); > if (taskAttemptID == null) { > taskAttemptID = new TaskAttemptID(); > } > taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID); > LOG.info("initialize serde with table properties."); > initializeSerProperties(taskContext, tableProperties); >
[jira] [Commented] (SPARK-26533) Support query auto cancel on thriftserver
[ https://issues.apache.org/jira/browse/SPARK-26533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815015#comment-16815015 ] zhoukang commented on SPARK-26533: -- I will work on this > Support query auto cancel on thriftserver > - > > Key: SPARK-26533 > URL: https://issues.apache.org/jira/browse/SPARK-26533 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: zhoukang >Priority: Major > > Support query auto cancelling when running too long on thriftserver. > For some cases,we use thriftserver as long-running applications. > Some times we want all the query need not to run more than given time. > In these cases,we can enable auto cancel for time-consumed query.Which can > let us release resources for other queries to run. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27068) Support failed jobs ui and completed jobs ui use different queue
[ https://issues.apache.org/jira/browse/SPARK-27068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815005#comment-16815005 ] zhoukang commented on SPARK-27068: -- For long-running application like spark sql,showing failed jobs in a single table will keep more clue for later debug [~shahid] > Support failed jobs ui and completed jobs ui use different queue > > > Key: SPARK-27068 > URL: https://issues.apache.org/jira/browse/SPARK-27068 > Project: Spark > Issue Type: Improvement > Components: Web UI >Affects Versions: 2.4.0 >Reporter: zhoukang >Priority: Major > > For some long running jobs,we may want to check out the cause of some failed > jobs. > But most jobs has completed and failed jobs ui may disappear, we can use > different queue for this two kinds of jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27068) Support failed jobs ui and completed jobs ui use different queue
zhoukang created SPARK-27068: Summary: Support failed jobs ui and completed jobs ui use different queue Key: SPARK-27068 URL: https://issues.apache.org/jira/browse/SPARK-27068 Project: Spark Issue Type: Improvement Components: Web UI Affects Versions: 2.4.0 Reporter: zhoukang For some long running jobs,we may want to check out the cause of some failed jobs. But most jobs has completed and failed jobs ui may disappear, we can use different queue for this two kinds of jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26951) Should not throw KryoException when root cause is IOexception
zhoukang created SPARK-26951: Summary: Should not throw KryoException when root cause is IOexception Key: SPARK-26951 URL: https://issues.apache.org/jira/browse/SPARK-26951 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.4.0 Reporter: zhoukang Job will failed with below exception: {code:java} Job aborted due to stage failure: Task 1576 in stage 97.0 failed 4 times, most recent failure: Lost task 1576.3 in stage 97.0 (TID 121949, xxx, executor 14): com.esotericsoftware.kryo.KryoException: java.io.IOException: Stream is corrupted. The lz4's magic number should be LZ4Block(4c5a34426c6f636b), but received buffer's head bytes is (). {code} {code:java} Job aborted due to stage failure: Task 1576 in stage 97.0 failed 4 times, most recent failure: Lost task 1576.3 in stage 97.0 (TID 121949, xxx, executor 14): com.esotericsoftware.kryo.KryoException: java.io.IOException: Stream is corrupted. The lz4's magic number should be LZ4Block(4c5a34426c6f636b), but received buffer's head bytes is (). at com.esotericsoftware.kryo.io.Input.fill(Input.java:166) at com.esotericsoftware.kryo.io.Input.require(Input.java:196) at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:373) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:127) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:804) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:244) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:180) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:324) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Stream is corrupted. The lz4's magic number should be LZ4Block(4c5a34426c6f636b), but received buffer's head bytes is (). at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:169) at org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:127) at com.esotericsoftware.kryo.io.Input.fill(Input.java:164) ... 19 more Driver stacktrace: {code} For IOException, it should retry -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26914) ThriftServer scheduler pool may be unpredictably when using fair schedule mode
[ https://issues.apache.org/jira/browse/SPARK-26914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-26914: - Attachment: 26914-03.png 26914-02.png 26914-01.png > ThriftServer scheduler pool may be unpredictably when using fair schedule mode > -- > > Key: SPARK-26914 > URL: https://issues.apache.org/jira/browse/SPARK-26914 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: zhoukang >Priority: Major > Attachments: 26914-01.png, 26914-02.png, 26914-03.png > > > When using fair scheduler mode for thrift server, we may have unpredictable > result. > {code:java} > val pool = sessionToActivePool.get(parentSession.getSessionHandle) > if (pool != null) { > > sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, > pool) > } > {code} > Here is an example: > We have some query will use default pool, however it submit to 'normal' pool > I changed code and add some log.Got some strange result. > Then i found out that the localProperties of SparkContext may has > unpredictable result when call setLocalProperty. And since thriftserver use > thread pool to execute queries, it will trigger this bug sometimes. > {code:java} > /** >* Set a local property that affects jobs submitted from this thread, such > as the Spark fair >* scheduler pool. User-defined properties may also be set here. These > properties are propagated >* through to worker tasks and can be accessed there via >* [[org.apache.spark.TaskContext#getLocalProperty]]. >* >* These properties are inherited by child threads spawned from this > thread. This >* may have unexpected consequences when working with thread pools. The > standard java >* implementation of thread pools have worker threads spawn other worker > threads. >* As a result, local properties may propagate unpredictably. >*/ > def setLocalProperty(key: String, value: String) { > if (value == null) { > localProperties.get.remove(key) > } else { > localProperties.get.setProperty(key, value) > } > } > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26914) ThriftServer scheduler pool may be unpredictably when using fair schedule mode
[ https://issues.apache.org/jira/browse/SPARK-26914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-26914: - Description: When using fair scheduler mode for thrift server, we may have unpredictable result. {code:java} val pool = sessionToActivePool.get(parentSession.getSessionHandle) if (pool != null) { sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, pool) } {code} Here is an example: We have some query will use default pool, however it submit to 'normal' pool. !26914-02.png! I changed code and add some log.Got some strange result. !26914-01.png! !26914-03.png! Then i found out that the localProperties of SparkContext may has unpredictable result when call setLocalProperty. And since thriftserver use thread pool to execute queries, it will trigger this bug sometimes. {code:java} /** * Set a local property that affects jobs submitted from this thread, such as the Spark fair * scheduler pool. User-defined properties may also be set here. These properties are propagated * through to worker tasks and can be accessed there via * [[org.apache.spark.TaskContext#getLocalProperty]]. * * These properties are inherited by child threads spawned from this thread. This * may have unexpected consequences when working with thread pools. The standard java * implementation of thread pools have worker threads spawn other worker threads. * As a result, local properties may propagate unpredictably. */ def setLocalProperty(key: String, value: String) { if (value == null) { localProperties.get.remove(key) } else { localProperties.get.setProperty(key, value) } } {code} was: When using fair scheduler mode for thrift server, we may have unpredictable result. {code:java} val pool = sessionToActivePool.get(parentSession.getSessionHandle) if (pool != null) { sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, pool) } {code} Here is an example: We have some query will use default pool, however it submit to 'normal' pool I changed code and add some log.Got some strange result. Then i found out that the localProperties of SparkContext may has unpredictable result when call setLocalProperty. And since thriftserver use thread pool to execute queries, it will trigger this bug sometimes. {code:java} /** * Set a local property that affects jobs submitted from this thread, such as the Spark fair * scheduler pool. User-defined properties may also be set here. These properties are propagated * through to worker tasks and can be accessed there via * [[org.apache.spark.TaskContext#getLocalProperty]]. * * These properties are inherited by child threads spawned from this thread. This * may have unexpected consequences when working with thread pools. The standard java * implementation of thread pools have worker threads spawn other worker threads. * As a result, local properties may propagate unpredictably. */ def setLocalProperty(key: String, value: String) { if (value == null) { localProperties.get.remove(key) } else { localProperties.get.setProperty(key, value) } } {code} > ThriftServer scheduler pool may be unpredictably when using fair schedule mode > -- > > Key: SPARK-26914 > URL: https://issues.apache.org/jira/browse/SPARK-26914 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: zhoukang >Priority: Major > Attachments: 26914-01.png, 26914-02.png, 26914-03.png > > > When using fair scheduler mode for thrift server, we may have unpredictable > result. > {code:java} > val pool = sessionToActivePool.get(parentSession.getSessionHandle) > if (pool != null) { > > sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, > pool) > } > {code} > Here is an example: > We have some query will use default pool, however it submit to 'normal' pool. > !26914-02.png! > I changed code and add some log.Got some strange result. > !26914-01.png! > !26914-03.png! > Then i found out that the localProperties of SparkContext may has > unpredictable result when call setLocalProperty. And since thriftserver use > thread pool to execute queries, it will trigger this bug sometimes. > {code:java} > /** >* Set a local property that affects jobs submitted from this thread, such > as the Spark fair >* scheduler pool. User-defined properties may also be set here. These > properties are propagated >* through to worker tasks and can be accessed there via >* [[org.apache.spark.TaskContext#getLocalProperty]]. >* >* These properties are inherited by child threads spawned from
[jira] [Created] (SPARK-26914) ThriftServer scheduler pool may be unpredictably when using fair schedule mode
zhoukang created SPARK-26914: Summary: ThriftServer scheduler pool may be unpredictably when using fair schedule mode Key: SPARK-26914 URL: https://issues.apache.org/jira/browse/SPARK-26914 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.0 Reporter: zhoukang When using fair scheduler mode for thrift server, we may have unpredictable result. {code:java} val pool = sessionToActivePool.get(parentSession.getSessionHandle) if (pool != null) { sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, pool) } {code} Here is an example: We have some query will use default pool, however it submit to 'normal' pool I changed code and add some log.Got some strange result. Then i found out that the localProperties of SparkContext may has unpredictable result when call setLocalProperty. And since thriftserver use thread pool to execute queries, it will trigger this bug sometimes. {code:java} /** * Set a local property that affects jobs submitted from this thread, such as the Spark fair * scheduler pool. User-defined properties may also be set here. These properties are propagated * through to worker tasks and can be accessed there via * [[org.apache.spark.TaskContext#getLocalProperty]]. * * These properties are inherited by child threads spawned from this thread. This * may have unexpected consequences when working with thread pools. The standard java * implementation of thread pools have worker threads spawn other worker threads. * As a result, local properties may propagate unpredictably. */ def setLocalProperty(key: String, value: String) { if (value == null) { localProperties.get.remove(key) } else { localProperties.get.setProperty(key, value) } } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26703) Hive record writer will always depends on parquet-1.6 writer should fix it
[ https://issues.apache.org/jira/browse/SPARK-26703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16766682#comment-16766682 ] zhoukang commented on SPARK-26703: -- As far as i know [~hyukjin.kwon] hive is still depend on twitter version for HiveFormatUtils > Hive record writer will always depends on parquet-1.6 writer should fix it > --- > > Key: SPARK-26703 > URL: https://issues.apache.org/jira/browse/SPARK-26703 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.2, 2.4.0 >Reporter: zhoukang >Priority: Major > > Currently, when we are using insert into hive table related command. > The parquet file generated will always be version 1.6,reason is below: > 1. we rely on hive-exec HiveFileFormatUtils to get recordWriter > {code:java} > private val hiveWriter = HiveFileFormatUtils.getHiveRecordWriter( > jobConf, > tableDesc, > serializer.getSerializedClass, > fileSinkConf, > new Path(path), > Reporter.NULL) > {code} > 2. we will call > {code:java} > public static RecordWriter getHiveRecordWriter(JobConf jc, > TableDesc tableInfo, Class outputClass, > FileSinkDesc conf, Path outPath, Reporter reporter) throws > HiveException { > HiveOutputFormat hiveOutputFormat = getHiveOutputFormat(jc, > tableInfo); > try { > boolean isCompressed = conf.getCompressed(); > JobConf jc_output = jc; > if (isCompressed) { > jc_output = new JobConf(jc); > String codecStr = conf.getCompressCodec(); > if (codecStr != null && !codecStr.trim().equals("")) { > Class codec = > (Class) > JavaUtils.loadClass(codecStr); > FileOutputFormat.setOutputCompressorClass(jc_output, codec); > } > String type = conf.getCompressType(); > if (type != null && !type.trim().equals("")) { > CompressionType style = CompressionType.valueOf(type); > SequenceFileOutputFormat.setOutputCompressionType(jc, style); > } > } > return getRecordWriter(jc_output, hiveOutputFormat, outputClass, > isCompressed, tableInfo.getProperties(), outPath, reporter); > } catch (Exception e) { > throw new HiveException(e); > } > } > public static RecordWriter getRecordWriter(JobConf jc, > OutputFormat outputFormat, > Class valueClass, boolean isCompressed, > Properties tableProp, Path outPath, Reporter reporter > ) throws IOException, HiveException { > if (!(outputFormat instanceof HiveOutputFormat)) { > outputFormat = new HivePassThroughOutputFormat(outputFormat); > } > return ((HiveOutputFormat)outputFormat).getHiveRecordWriter( > jc, outPath, valueClass, isCompressed, tableProp, reporter); > } > {code} > 3. then in MapredParquetOutPutFormat > {code:java} > public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter > getHiveRecordWriter( > final JobConf jobConf, > final Path finalOutPath, > final Class valueClass, > final boolean isCompressed, > final Properties tableProperties, > final Progressable progress) throws IOException { > LOG.info("creating new record writer..." + this); > final String columnNameProperty = > tableProperties.getProperty(IOConstants.COLUMNS); > final String columnTypeProperty = > tableProperties.getProperty(IOConstants.COLUMNS_TYPES); > List columnNames; > List columnTypes; > if (columnNameProperty.length() == 0) { > columnNames = new ArrayList(); > } else { > columnNames = Arrays.asList(columnNameProperty.split(",")); > } > if (columnTypeProperty.length() == 0) { > columnTypes = new ArrayList(); > } else { > columnTypes = > TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); > } > > DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, > columnTypes), jobConf); > return getParquerRecordWriterWrapper(realOutputFormat, jobConf, > finalOutPath.toString(), > progress,tableProperties); > } > {code} > 4. then call > {code:java} > public ParquetRecordWriterWrapper( > final OutputFormat realOutputFormat, > final JobConf jobConf, > final String name, > final Progressable progress, Properties tableProperties) throws > IOException { > try { > // create a TaskInputOutputContext > TaskAttemptID taskAttemptID = > TaskAttemptID.forName(jobConf.get("mapred.task.id")); > if (taskAttemptID == null) { > taskAttemptID = new TaskAttemptID(); > } > taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID); > LOG.info("initialize serde with table properties."); >
[jira] [Updated] (SPARK-26756) Support session conf for thriftserver
[ https://issues.apache.org/jira/browse/SPARK-26756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-26756: - Description: We can add support for session conf.like: {code:java} set spark.sql.xxx.xxx=xxx {code} which can be useful. was: We can add support for session conf.like: {code:java} set spark.sql.xxx.xxx=xxx {code} which i think can be useful. > Support session conf for thriftserver > - > > Key: SPARK-26756 > URL: https://issues.apache.org/jira/browse/SPARK-26756 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: zhoukang >Priority: Major > > We can add support for session conf.like: > {code:java} > set spark.sql.xxx.xxx=xxx > {code} > which can be useful. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26756) Support session conf for thriftserver
zhoukang created SPARK-26756: Summary: Support session conf for thriftserver Key: SPARK-26756 URL: https://issues.apache.org/jira/browse/SPARK-26756 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0 Reporter: zhoukang We can add support for session conf.like: {code:java} set spark.sql.xxx.xxx=xxx {code} which i think can be useful. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26751) HiveSessionImpl might have memory leak since Operation do not close properly
[ https://issues.apache.org/jira/browse/SPARK-26751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-26751: - Description: When we run in background and we get exception which is not HiveSQLException, we may encounter memory leak since handleToOperation will not removed correctly. The reason is below: 1. when calling operation.run we throw an exception which is not HiveSQLException 2. then opHandleSet will not add the opHandle, and operationManager.closeOperation(opHandle); will not be called {code:java} private OperationHandle executeStatementInternal(String statement, Map confOverlay, boolean runAsync) throws HiveSQLException { this.acquire(true); OperationManager operationManager = this.getOperationManager(); ExecuteStatementOperation operation = operationManager.newExecuteStatementOperation(this.getSession(), statement, confOverlay, runAsync); OperationHandle opHandle = operation.getHandle(); OperationHandle e; try { operation.run(); this.opHandleSet.add(opHandle); e = opHandle; } catch (HiveSQLException var11) { operationManager.closeOperation(opHandle); throw var11; } finally { this.release(true); } return e; } try { // This submit blocks if no background threads are available to run this operation val backgroundHandle = parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation) setBackgroundHandle(backgroundHandle) } catch { case rejected: RejectedExecutionException => setState(OperationState.ERROR) throw new HiveSQLException("The background threadpool cannot accept" + " new task for execution, please retry the operation", rejected) case NonFatal(e) => logError(s"Error executing query in background", e) setState(OperationState.ERROR) throw e } } {code} 3. when we close the session we will also call operationManager.closeOperation(opHandle),since we did not add this opHandle into the opHandleSet. {code} public void close() throws HiveSQLException { try { this.acquire(true); Iterator ioe = this.opHandleSet.iterator(); while(ioe.hasNext()) { OperationHandle opHandle = (OperationHandle)ioe.next(); this.operationManager.closeOperation(opHandle); } this.opHandleSet.clear(); this.cleanupSessionLogDir(); this.cleanupPipeoutFile(); HiveHistory ioe1 = this.sessionState.getHiveHistory(); if(null != ioe1) { ioe1.closeStream(); } try { this.sessionState.close(); } finally { this.sessionState = null; } } catch (IOException var17) { throw new HiveSQLException("Failure to close", var17); } finally { if(this.sessionState != null) { try { this.sessionState.close(); } catch (Throwable var15) { LOG.warn("Error closing session", var15); } this.sessionState = null; } this.release(true); } } {code} 4. however, the opHandle will added into handleToOperation for each statement {code} val handleToOperation = ReflectionUtils .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation") val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]() val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]() override def newExecuteStatementOperation( parentSession: HiveSession, statement: String, confOverlay: JMap[String, String], async: Boolean): ExecuteStatementOperation = synchronized { val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + s" initialized or had already closed.") val conf = sqlContext.sessionState.conf val hiveSessionState = parentSession.getSessionState setConfMap(conf, hiveSessionState.getOverriddenConfigurations) setConfMap(conf, hiveSessionState.getHiveVariables) val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC) val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground)(sqlContext, sessionToActivePool) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created Operation for $statement with session=$parentSession, " + s"runInBackground=$runInBackground") operation } {code} Below is an example which has memory leak: was: When we run in
[jira] [Updated] (SPARK-26751) HiveSessionImpl might have memory leak since Operation do not close properly
[ https://issues.apache.org/jira/browse/SPARK-26751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-26751: - Description: When we run in background and we get exception which is not HiveSQLException, we may encounter memory leak since handleToOperation will not removed correctly. The reason is below: 1. when calling operation.run we throw an exception which is not HiveSQLException 2. then opHandleSet will not add the opHandle, and operationManager.closeOperation(opHandle); will not be called {code:java} private OperationHandle executeStatementInternal(String statement, Map confOverlay, boolean runAsync) throws HiveSQLException { this.acquire(true); OperationManager operationManager = this.getOperationManager(); ExecuteStatementOperation operation = operationManager.newExecuteStatementOperation(this.getSession(), statement, confOverlay, runAsync); OperationHandle opHandle = operation.getHandle(); OperationHandle e; try { operation.run(); this.opHandleSet.add(opHandle); e = opHandle; } catch (HiveSQLException var11) { operationManager.closeOperation(opHandle); throw var11; } finally { this.release(true); } return e; } try { // This submit blocks if no background threads are available to run this operation val backgroundHandle = parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation) setBackgroundHandle(backgroundHandle) } catch { case rejected: RejectedExecutionException => setState(OperationState.ERROR) throw new HiveSQLException("The background threadpool cannot accept" + " new task for execution, please retry the operation", rejected) case NonFatal(e) => logError(s"Error executing query in background", e) setState(OperationState.ERROR) throw e } } {code} 3. when we close the session we will also call operationManager.closeOperation(opHandle),since we did not add this opHandle into the opHandleSet. {code} public void close() throws HiveSQLException { try { this.acquire(true); Iterator ioe = this.opHandleSet.iterator(); while(ioe.hasNext()) { OperationHandle opHandle = (OperationHandle)ioe.next(); this.operationManager.closeOperation(opHandle); } this.opHandleSet.clear(); this.cleanupSessionLogDir(); this.cleanupPipeoutFile(); HiveHistory ioe1 = this.sessionState.getHiveHistory(); if(null != ioe1) { ioe1.closeStream(); } try { this.sessionState.close(); } finally { this.sessionState = null; } } catch (IOException var17) { throw new HiveSQLException("Failure to close", var17); } finally { if(this.sessionState != null) { try { this.sessionState.close(); } catch (Throwable var15) { LOG.warn("Error closing session", var15); } this.sessionState = null; } this.release(true); } } {code} 4. however, the opHandle will added into handleToOperation for each statement {code} val handleToOperation = ReflectionUtils .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation") val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]() val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]() override def newExecuteStatementOperation( parentSession: HiveSession, statement: String, confOverlay: JMap[String, String], async: Boolean): ExecuteStatementOperation = synchronized { val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + s" initialized or had already closed.") val conf = sqlContext.sessionState.conf val hiveSessionState = parentSession.getSessionState setConfMap(conf, hiveSessionState.getOverriddenConfigurations) setConfMap(conf, hiveSessionState.getHiveVariables) val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC) val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground)(sqlContext, sessionToActivePool) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created Operation for $statement with session=$parentSession, " + s"runInBackground=$runInBackground") operation } {code} Below is an example which has memory leak: !26751.png! was:
[jira] [Updated] (SPARK-26751) HiveSessionImpl might have memory leak since Operation do not close properly
[ https://issues.apache.org/jira/browse/SPARK-26751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-26751: - Attachment: 26751.png > HiveSessionImpl might have memory leak since Operation do not close properly > > > Key: SPARK-26751 > URL: https://issues.apache.org/jira/browse/SPARK-26751 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: zhoukang >Priority: Major > Attachments: 26751.png > > > When we run in background and we get exception which is not HiveSQLException, > we may encounter memory leak since handleToOperation will not removed > correctly. > The reason is below: > 1. when calling operation.run we throw an exception which is not > HiveSQLException > 2. then opHandleSet will not add the opHandle, and > operationManager.closeOperation(opHandle); will not be called > {code:java} > private OperationHandle executeStatementInternal(String statement, > Map confOverlay, boolean runAsync) throws HiveSQLException { > this.acquire(true); > OperationManager operationManager = this.getOperationManager(); > ExecuteStatementOperation operation = > operationManager.newExecuteStatementOperation(this.getSession(), statement, > confOverlay, runAsync); > OperationHandle opHandle = operation.getHandle(); > OperationHandle e; > try { > operation.run(); > this.opHandleSet.add(opHandle); > e = opHandle; > } catch (HiveSQLException var11) { > operationManager.closeOperation(opHandle); > throw var11; > } finally { > this.release(true); > } > return e; > } > try { > // This submit blocks if no background threads are available to run > this operation > val backgroundHandle = > > parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation) > setBackgroundHandle(backgroundHandle) > } catch { > case rejected: RejectedExecutionException => > setState(OperationState.ERROR) > throw new HiveSQLException("The background threadpool cannot > accept" + > " new task for execution, please retry the operation", rejected) > case NonFatal(e) => > logError(s"Error executing query in background", e) > setState(OperationState.ERROR) > throw e > } > } > {code} > 3. when we close the session we will also call > operationManager.closeOperation(opHandle),since we did not add this opHandle > into the opHandleSet. > {code} > public void close() throws HiveSQLException { > try { > this.acquire(true); > Iterator ioe = this.opHandleSet.iterator(); > while(ioe.hasNext()) { > OperationHandle opHandle = (OperationHandle)ioe.next(); > this.operationManager.closeOperation(opHandle); > } > this.opHandleSet.clear(); > this.cleanupSessionLogDir(); > this.cleanupPipeoutFile(); > HiveHistory ioe1 = this.sessionState.getHiveHistory(); > if(null != ioe1) { > ioe1.closeStream(); > } > try { > this.sessionState.close(); > } finally { > this.sessionState = null; > } > } catch (IOException var17) { > throw new HiveSQLException("Failure to close", var17); > } finally { > if(this.sessionState != null) { > try { > this.sessionState.close(); > } catch (Throwable var15) { > LOG.warn("Error closing session", var15); > } > this.sessionState = null; > } > this.release(true); > } > } > {code} > 4. however, the opHandle will added into handleToOperation for each statement > {code} > val handleToOperation = ReflectionUtils > .getSuperField[JMap[OperationHandle, Operation]](this, > "handleToOperation") > val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]() > val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]() > override def newExecuteStatementOperation( > parentSession: HiveSession, > statement: String, > confOverlay: JMap[String, String], > async: Boolean): ExecuteStatementOperation = synchronized { > val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) > require(sqlContext != null, s"Session handle: > ${parentSession.getSessionHandle} has not been" + > s" initialized or had already closed.") > val conf = sqlContext.sessionState.conf >
[jira] [Created] (SPARK-26751) HiveSessionImpl might have memory leak since Operation do not close properly
zhoukang created SPARK-26751: Summary: HiveSessionImpl might have memory leak since Operation do not close properly Key: SPARK-26751 URL: https://issues.apache.org/jira/browse/SPARK-26751 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.0 Reporter: zhoukang When we run in background and we get exception which is not HiveSQLException, we may encounter memory leak since handleToOperation will not removed correctly. The reason is below: 1. when calling operation.run we throw an exception which is not HiveSQLException 2. then opHandleSet will not add the opHandle, and operationManager.closeOperation(opHandle); will not be called {code:java} private OperationHandle executeStatementInternal(String statement, Map confOverlay, boolean runAsync) throws HiveSQLException { this.acquire(true); OperationManager operationManager = this.getOperationManager(); ExecuteStatementOperation operation = operationManager.newExecuteStatementOperation(this.getSession(), statement, confOverlay, runAsync); OperationHandle opHandle = operation.getHandle(); OperationHandle e; try { operation.run(); this.opHandleSet.add(opHandle); e = opHandle; } catch (HiveSQLException var11) { operationManager.closeOperation(opHandle); throw var11; } finally { this.release(true); } return e; } try { // This submit blocks if no background threads are available to run this operation val backgroundHandle = parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation) setBackgroundHandle(backgroundHandle) } catch { case rejected: RejectedExecutionException => setState(OperationState.ERROR) throw new HiveSQLException("The background threadpool cannot accept" + " new task for execution, please retry the operation", rejected) case NonFatal(e) => logError(s"Error executing query in background", e) setState(OperationState.ERROR) throw e } } {code} 3. when we close the session we will also call operationManager.closeOperation(opHandle),since we did not add this opHandle into the opHandleSet. {code} public void close() throws HiveSQLException { try { this.acquire(true); Iterator ioe = this.opHandleSet.iterator(); while(ioe.hasNext()) { OperationHandle opHandle = (OperationHandle)ioe.next(); this.operationManager.closeOperation(opHandle); } this.opHandleSet.clear(); this.cleanupSessionLogDir(); this.cleanupPipeoutFile(); HiveHistory ioe1 = this.sessionState.getHiveHistory(); if(null != ioe1) { ioe1.closeStream(); } try { this.sessionState.close(); } finally { this.sessionState = null; } } catch (IOException var17) { throw new HiveSQLException("Failure to close", var17); } finally { if(this.sessionState != null) { try { this.sessionState.close(); } catch (Throwable var15) { LOG.warn("Error closing session", var15); } this.sessionState = null; } this.release(true); } } {code} 4. however, the opHandle will added into handleToOperation for each statement {code} val handleToOperation = ReflectionUtils .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation") val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]() val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]() override def newExecuteStatementOperation( parentSession: HiveSession, statement: String, confOverlay: JMap[String, String], async: Boolean): ExecuteStatementOperation = synchronized { val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + s" initialized or had already closed.") val conf = sqlContext.sessionState.conf val hiveSessionState = parentSession.getSessionState setConfMap(conf, hiveSessionState.getOverriddenConfigurations) setConfMap(conf, hiveSessionState.getHiveVariables) val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC) val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground)(sqlContext, sessionToActivePool) handleToOperation.put(operation.getHandle, operation)
[jira] [Created] (SPARK-26703) Hive record writer will always depends on parquet-1.6 writer should fix it
zhoukang created SPARK-26703: Summary: Hive record writer will always depends on parquet-1.6 writer should fix it Key: SPARK-26703 URL: https://issues.apache.org/jira/browse/SPARK-26703 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0, 2.3.2 Reporter: zhoukang Currently, when we are using insert into hive table related command. The parquet file generated will always be version 1.6,reason is below: 1. we rely on hive-exec HiveFileFormatUtils to get recordWriter {code:java} private val hiveWriter = HiveFileFormatUtils.getHiveRecordWriter( jobConf, tableDesc, serializer.getSerializedClass, fileSinkConf, new Path(path), Reporter.NULL) {code} 2. we will call {code:java} public static RecordWriter getHiveRecordWriter(JobConf jc, TableDesc tableInfo, Class outputClass, FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException { HiveOutputFormat hiveOutputFormat = getHiveOutputFormat(jc, tableInfo); try { boolean isCompressed = conf.getCompressed(); JobConf jc_output = jc; if (isCompressed) { jc_output = new JobConf(jc); String codecStr = conf.getCompressCodec(); if (codecStr != null && !codecStr.trim().equals("")) { Class codec = (Class) JavaUtils.loadClass(codecStr); FileOutputFormat.setOutputCompressorClass(jc_output, codec); } String type = conf.getCompressType(); if (type != null && !type.trim().equals("")) { CompressionType style = CompressionType.valueOf(type); SequenceFileOutputFormat.setOutputCompressionType(jc, style); } } return getRecordWriter(jc_output, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), outPath, reporter); } catch (Exception e) { throw new HiveException(e); } } public static RecordWriter getRecordWriter(JobConf jc, OutputFormat outputFormat, Class valueClass, boolean isCompressed, Properties tableProp, Path outPath, Reporter reporter ) throws IOException, HiveException { if (!(outputFormat instanceof HiveOutputFormat)) { outputFormat = new HivePassThroughOutputFormat(outputFormat); } return ((HiveOutputFormat)outputFormat).getHiveRecordWriter( jc, outPath, valueClass, isCompressed, tableProp, reporter); } {code} 3. then in MapredParquetOutPutFormat {code:java} public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter( final JobConf jobConf, final Path finalOutPath, final Class valueClass, final boolean isCompressed, final Properties tableProperties, final Progressable progress) throws IOException { LOG.info("creating new record writer..." + this); final String columnNameProperty = tableProperties.getProperty(IOConstants.COLUMNS); final String columnTypeProperty = tableProperties.getProperty(IOConstants.COLUMNS_TYPES); List columnNames; List columnTypes; if (columnNameProperty.length() == 0) { columnNames = new ArrayList(); } else { columnNames = Arrays.asList(columnNameProperty.split(",")); } if (columnTypeProperty.length() == 0) { columnTypes = new ArrayList(); } else { columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); } DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, columnTypes), jobConf); return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), progress,tableProperties); } {code} 4. then call {code:java} public ParquetRecordWriterWrapper( final OutputFormat realOutputFormat, final JobConf jobConf, final String name, final Progressable progress, Properties tableProperties) throws IOException { try { // create a TaskInputOutputContext TaskAttemptID taskAttemptID = TaskAttemptID.forName(jobConf.get("mapred.task.id")); if (taskAttemptID == null) { taskAttemptID = new TaskAttemptID(); } taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID); LOG.info("initialize serde with table properties."); initializeSerProperties(taskContext, tableProperties); LOG.info("creating real writer to write at " + name); realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext, new Path(name)); LOG.info("real writer: " + realWriter); } catch (final InterruptedException e) { throw new IOException(e); } } {code} And the ((ParquetOutputFormat) is verison 1.6. And all file generated will miss some useful Statistics like min max of string. We should fix this issue to use new features of parquet -- This message
[jira] [Updated] (SPARK-26601) Make broadcast-exchange thread pool keepalivetime and maxThreadNumber configurable
[ https://issues.apache.org/jira/browse/SPARK-26601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-26601: - Summary: Make broadcast-exchange thread pool keepalivetime and maxThreadNumber configurable (was: Make broadcast-exchange thread pool keepalivetime configurable) > Make broadcast-exchange thread pool keepalivetime and maxThreadNumber > configurable > -- > > Key: SPARK-26601 > URL: https://issues.apache.org/jira/browse/SPARK-26601 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: zhoukang >Priority: Major > Attachments: 选区_001.png, 选区_002 (1).png, 选区_002.png > > > Currently,thread number of broadcast-exchange thread pool is fixed and > keepAliveSeconds is also fixed as 60s. > {code:java} > object BroadcastExchangeExec { > private[execution] val executionContext = > ExecutionContext.fromExecutorService( > ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128)) > } > /** >* Create a cached thread pool whose max number of threads is > `maxThreadNumber`. Thread names >* are formatted as prefix-ID, where ID is a unique, sequentially assigned > integer. >*/ > def newDaemonCachedThreadPool( > prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): > ThreadPoolExecutor = { > val threadFactory = namedThreadFactory(prefix) > val threadPool = new ThreadPoolExecutor( > maxThreadNumber, // corePoolSize: the max number of threads to create > before queuing the tasks > maxThreadNumber, // maximumPoolSize: because we use > LinkedBlockingDeque, this one is not used > keepAliveSeconds, > TimeUnit.SECONDS, > new LinkedBlockingQueue[Runnable], > threadFactory) > threadPool.allowCoreThreadTimeOut(true) > threadPool > } > {code} > But some times, if the Thead object do not GC quickly it may caused > server(driver) OOM. > Below is an example: -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26601) Make broadcast-exchange thread pool keepalivetime and maxThreadNumber configurable
[ https://issues.apache.org/jira/browse/SPARK-26601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-26601: - Attachment: 选区_002 (1).png 选区_002.png 选区_001.png > Make broadcast-exchange thread pool keepalivetime and maxThreadNumber > configurable > -- > > Key: SPARK-26601 > URL: https://issues.apache.org/jira/browse/SPARK-26601 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: zhoukang >Priority: Major > Attachments: 选区_001.png, 选区_002 (1).png, 选区_002.png > > > Currently,thread number of broadcast-exchange thread pool is fixed and > keepAliveSeconds is also fixed as 60s. > {code:java} > object BroadcastExchangeExec { > private[execution] val executionContext = > ExecutionContext.fromExecutorService( > ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128)) > } > /** >* Create a cached thread pool whose max number of threads is > `maxThreadNumber`. Thread names >* are formatted as prefix-ID, where ID is a unique, sequentially assigned > integer. >*/ > def newDaemonCachedThreadPool( > prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): > ThreadPoolExecutor = { > val threadFactory = namedThreadFactory(prefix) > val threadPool = new ThreadPoolExecutor( > maxThreadNumber, // corePoolSize: the max number of threads to create > before queuing the tasks > maxThreadNumber, // maximumPoolSize: because we use > LinkedBlockingDeque, this one is not used > keepAliveSeconds, > TimeUnit.SECONDS, > new LinkedBlockingQueue[Runnable], > threadFactory) > threadPool.allowCoreThreadTimeOut(true) > threadPool > } > {code} > But some times, if the Thead object do not GC quickly it may caused > server(driver) OOM. > Below is an example: -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26601) Make broadcast-exchange thread pool keepalivetime and maxThreadNumber configurable
[ https://issues.apache.org/jira/browse/SPARK-26601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-26601: - Description: Currently,thread number of broadcast-exchange thread pool is fixed and keepAliveSeconds is also fixed as 60s. {code:java} object BroadcastExchangeExec { private[execution] val executionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128)) } /** * Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names * are formatted as prefix-ID, where ID is a unique, sequentially assigned integer. */ def newDaemonCachedThreadPool( prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = { val threadFactory = namedThreadFactory(prefix) val threadPool = new ThreadPoolExecutor( maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used keepAliveSeconds, TimeUnit.SECONDS, new LinkedBlockingQueue[Runnable], threadFactory) threadPool.allowCoreThreadTimeOut(true) threadPool } {code} But some times, if the Thead object do not GC quickly it may caused server(driver) OOM. In such case,we need to make this thread pool configurable. Below is an example: !26601-occupy.png! !26601-largeobject.png! !26601-path2gcroot.png! was: Currently,thread number of broadcast-exchange thread pool is fixed and keepAliveSeconds is also fixed as 60s. {code:java} object BroadcastExchangeExec { private[execution] val executionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128)) } /** * Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names * are formatted as prefix-ID, where ID is a unique, sequentially assigned integer. */ def newDaemonCachedThreadPool( prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = { val threadFactory = namedThreadFactory(prefix) val threadPool = new ThreadPoolExecutor( maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used keepAliveSeconds, TimeUnit.SECONDS, new LinkedBlockingQueue[Runnable], threadFactory) threadPool.allowCoreThreadTimeOut(true) threadPool } {code} But some times, if the Thead object do not GC quickly it may caused server(driver) OOM. Below is an example: !26601-occupy.png! !26601-largeobject.png! !26601-path2gcroot.png! > Make broadcast-exchange thread pool keepalivetime and maxThreadNumber > configurable > -- > > Key: SPARK-26601 > URL: https://issues.apache.org/jira/browse/SPARK-26601 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: zhoukang >Priority: Major > Attachments: 26601-largeobject.png, 26601-occupy.png, > 26601-path2gcroot.png > > > Currently,thread number of broadcast-exchange thread pool is fixed and > keepAliveSeconds is also fixed as 60s. > {code:java} > object BroadcastExchangeExec { > private[execution] val executionContext = > ExecutionContext.fromExecutorService( > ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128)) > } > /** >* Create a cached thread pool whose max number of threads is > `maxThreadNumber`. Thread names >* are formatted as prefix-ID, where ID is a unique, sequentially assigned > integer. >*/ > def newDaemonCachedThreadPool( > prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): > ThreadPoolExecutor = { > val threadFactory = namedThreadFactory(prefix) > val threadPool = new ThreadPoolExecutor( > maxThreadNumber, // corePoolSize: the max number of threads to create > before queuing the tasks > maxThreadNumber, // maximumPoolSize: because we use > LinkedBlockingDeque, this one is not used > keepAliveSeconds, > TimeUnit.SECONDS, > new LinkedBlockingQueue[Runnable], > threadFactory) > threadPool.allowCoreThreadTimeOut(true) > threadPool > } > {code} > But some times, if the Thead object do not GC quickly it may caused > server(driver) OOM. In such case,we need to make this thread pool > configurable. > Below is an example: > !26601-occupy.png! > !26601-largeobject.png! > !26601-path2gcroot.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To
[jira] [Updated] (SPARK-26601) Make broadcast-exchange thread pool keepalivetime and maxThreadNumber configurable
[ https://issues.apache.org/jira/browse/SPARK-26601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-26601: - Description: Currently,thread number of broadcast-exchange thread pool is fixed and keepAliveSeconds is also fixed as 60s. {code:java} object BroadcastExchangeExec { private[execution] val executionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128)) } /** * Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names * are formatted as prefix-ID, where ID is a unique, sequentially assigned integer. */ def newDaemonCachedThreadPool( prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = { val threadFactory = namedThreadFactory(prefix) val threadPool = new ThreadPoolExecutor( maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used keepAliveSeconds, TimeUnit.SECONDS, new LinkedBlockingQueue[Runnable], threadFactory) threadPool.allowCoreThreadTimeOut(true) threadPool } {code} But some times, if the Thead object do not GC quickly it may caused server(driver) OOM. Below is an example: !26601-occupy.png! !26601-largeobject.png! !26601-path2gcroot.png! was: Currently,thread number of broadcast-exchange thread pool is fixed and keepAliveSeconds is also fixed as 60s. {code:java} object BroadcastExchangeExec { private[execution] val executionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128)) } /** * Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names * are formatted as prefix-ID, where ID is a unique, sequentially assigned integer. */ def newDaemonCachedThreadPool( prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = { val threadFactory = namedThreadFactory(prefix) val threadPool = new ThreadPoolExecutor( maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used keepAliveSeconds, TimeUnit.SECONDS, new LinkedBlockingQueue[Runnable], threadFactory) threadPool.allowCoreThreadTimeOut(true) threadPool } {code} But some times, if the Thead object do not GC quickly it may caused server(driver) OOM. Below is an example: > Make broadcast-exchange thread pool keepalivetime and maxThreadNumber > configurable > -- > > Key: SPARK-26601 > URL: https://issues.apache.org/jira/browse/SPARK-26601 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: zhoukang >Priority: Major > Attachments: 26601-largeobject.png, 26601-occupy.png, > 26601-path2gcroot.png > > > Currently,thread number of broadcast-exchange thread pool is fixed and > keepAliveSeconds is also fixed as 60s. > {code:java} > object BroadcastExchangeExec { > private[execution] val executionContext = > ExecutionContext.fromExecutorService( > ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128)) > } > /** >* Create a cached thread pool whose max number of threads is > `maxThreadNumber`. Thread names >* are formatted as prefix-ID, where ID is a unique, sequentially assigned > integer. >*/ > def newDaemonCachedThreadPool( > prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): > ThreadPoolExecutor = { > val threadFactory = namedThreadFactory(prefix) > val threadPool = new ThreadPoolExecutor( > maxThreadNumber, // corePoolSize: the max number of threads to create > before queuing the tasks > maxThreadNumber, // maximumPoolSize: because we use > LinkedBlockingDeque, this one is not used > keepAliveSeconds, > TimeUnit.SECONDS, > new LinkedBlockingQueue[Runnable], > threadFactory) > threadPool.allowCoreThreadTimeOut(true) > threadPool > } > {code} > But some times, if the Thead object do not GC quickly it may caused > server(driver) OOM. > Below is an example: > !26601-occupy.png! > !26601-largeobject.png! > !26601-path2gcroot.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26601) Make broadcast-exchange thread pool keepalivetime and maxThreadNumber configurable
[ https://issues.apache.org/jira/browse/SPARK-26601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-26601: - Attachment: (was: 选区_002.png) > Make broadcast-exchange thread pool keepalivetime and maxThreadNumber > configurable > -- > > Key: SPARK-26601 > URL: https://issues.apache.org/jira/browse/SPARK-26601 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: zhoukang >Priority: Major > Attachments: 26601-largeobject.png, 26601-occupy.png, > 26601-path2gcroot.png > > > Currently,thread number of broadcast-exchange thread pool is fixed and > keepAliveSeconds is also fixed as 60s. > {code:java} > object BroadcastExchangeExec { > private[execution] val executionContext = > ExecutionContext.fromExecutorService( > ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128)) > } > /** >* Create a cached thread pool whose max number of threads is > `maxThreadNumber`. Thread names >* are formatted as prefix-ID, where ID is a unique, sequentially assigned > integer. >*/ > def newDaemonCachedThreadPool( > prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): > ThreadPoolExecutor = { > val threadFactory = namedThreadFactory(prefix) > val threadPool = new ThreadPoolExecutor( > maxThreadNumber, // corePoolSize: the max number of threads to create > before queuing the tasks > maxThreadNumber, // maximumPoolSize: because we use > LinkedBlockingDeque, this one is not used > keepAliveSeconds, > TimeUnit.SECONDS, > new LinkedBlockingQueue[Runnable], > threadFactory) > threadPool.allowCoreThreadTimeOut(true) > threadPool > } > {code} > But some times, if the Thead object do not GC quickly it may caused > server(driver) OOM. > Below is an example: -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26601) Make broadcast-exchange thread pool keepalivetime and maxThreadNumber configurable
[ https://issues.apache.org/jira/browse/SPARK-26601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-26601: - Attachment: 26601-path2gcroot.png 26601-occupy.png 26601-largeobject.png > Make broadcast-exchange thread pool keepalivetime and maxThreadNumber > configurable > -- > > Key: SPARK-26601 > URL: https://issues.apache.org/jira/browse/SPARK-26601 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: zhoukang >Priority: Major > Attachments: 26601-largeobject.png, 26601-occupy.png, > 26601-path2gcroot.png > > > Currently,thread number of broadcast-exchange thread pool is fixed and > keepAliveSeconds is also fixed as 60s. > {code:java} > object BroadcastExchangeExec { > private[execution] val executionContext = > ExecutionContext.fromExecutorService( > ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128)) > } > /** >* Create a cached thread pool whose max number of threads is > `maxThreadNumber`. Thread names >* are formatted as prefix-ID, where ID is a unique, sequentially assigned > integer. >*/ > def newDaemonCachedThreadPool( > prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): > ThreadPoolExecutor = { > val threadFactory = namedThreadFactory(prefix) > val threadPool = new ThreadPoolExecutor( > maxThreadNumber, // corePoolSize: the max number of threads to create > before queuing the tasks > maxThreadNumber, // maximumPoolSize: because we use > LinkedBlockingDeque, this one is not used > keepAliveSeconds, > TimeUnit.SECONDS, > new LinkedBlockingQueue[Runnable], > threadFactory) > threadPool.allowCoreThreadTimeOut(true) > threadPool > } > {code} > But some times, if the Thead object do not GC quickly it may caused > server(driver) OOM. > Below is an example: -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26601) Make broadcast-exchange thread pool keepalivetime and maxThreadNumber configurable
[ https://issues.apache.org/jira/browse/SPARK-26601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-26601: - Attachment: (was: 选区_001.png) > Make broadcast-exchange thread pool keepalivetime and maxThreadNumber > configurable > -- > > Key: SPARK-26601 > URL: https://issues.apache.org/jira/browse/SPARK-26601 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: zhoukang >Priority: Major > Attachments: 26601-largeobject.png, 26601-occupy.png, > 26601-path2gcroot.png > > > Currently,thread number of broadcast-exchange thread pool is fixed and > keepAliveSeconds is also fixed as 60s. > {code:java} > object BroadcastExchangeExec { > private[execution] val executionContext = > ExecutionContext.fromExecutorService( > ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128)) > } > /** >* Create a cached thread pool whose max number of threads is > `maxThreadNumber`. Thread names >* are formatted as prefix-ID, where ID is a unique, sequentially assigned > integer. >*/ > def newDaemonCachedThreadPool( > prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): > ThreadPoolExecutor = { > val threadFactory = namedThreadFactory(prefix) > val threadPool = new ThreadPoolExecutor( > maxThreadNumber, // corePoolSize: the max number of threads to create > before queuing the tasks > maxThreadNumber, // maximumPoolSize: because we use > LinkedBlockingDeque, this one is not used > keepAliveSeconds, > TimeUnit.SECONDS, > new LinkedBlockingQueue[Runnable], > threadFactory) > threadPool.allowCoreThreadTimeOut(true) > threadPool > } > {code} > But some times, if the Thead object do not GC quickly it may caused > server(driver) OOM. > Below is an example: -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26601) Make broadcast-exchange thread pool keepalivetime and maxThreadNumber configurable
[ https://issues.apache.org/jira/browse/SPARK-26601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-26601: - Attachment: (was: 选区_002 (1).png) > Make broadcast-exchange thread pool keepalivetime and maxThreadNumber > configurable > -- > > Key: SPARK-26601 > URL: https://issues.apache.org/jira/browse/SPARK-26601 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: zhoukang >Priority: Major > Attachments: 26601-largeobject.png, 26601-occupy.png, > 26601-path2gcroot.png > > > Currently,thread number of broadcast-exchange thread pool is fixed and > keepAliveSeconds is also fixed as 60s. > {code:java} > object BroadcastExchangeExec { > private[execution] val executionContext = > ExecutionContext.fromExecutorService( > ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128)) > } > /** >* Create a cached thread pool whose max number of threads is > `maxThreadNumber`. Thread names >* are formatted as prefix-ID, where ID is a unique, sequentially assigned > integer. >*/ > def newDaemonCachedThreadPool( > prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): > ThreadPoolExecutor = { > val threadFactory = namedThreadFactory(prefix) > val threadPool = new ThreadPoolExecutor( > maxThreadNumber, // corePoolSize: the max number of threads to create > before queuing the tasks > maxThreadNumber, // maximumPoolSize: because we use > LinkedBlockingDeque, this one is not used > keepAliveSeconds, > TimeUnit.SECONDS, > new LinkedBlockingQueue[Runnable], > threadFactory) > threadPool.allowCoreThreadTimeOut(true) > threadPool > } > {code} > But some times, if the Thead object do not GC quickly it may caused > server(driver) OOM. > Below is an example: -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26601) Make broadcast-exchange thread pool keepalivetime configurable
zhoukang created SPARK-26601: Summary: Make broadcast-exchange thread pool keepalivetime configurable Key: SPARK-26601 URL: https://issues.apache.org/jira/browse/SPARK-26601 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0 Reporter: zhoukang Currently,thread number of broadcast-exchange thread pool is fixed and keepAliveSeconds is also fixed as 60s. {code:java} object BroadcastExchangeExec { private[execution] val executionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128)) } /** * Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names * are formatted as prefix-ID, where ID is a unique, sequentially assigned integer. */ def newDaemonCachedThreadPool( prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = { val threadFactory = namedThreadFactory(prefix) val threadPool = new ThreadPoolExecutor( maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used keepAliveSeconds, TimeUnit.SECONDS, new LinkedBlockingQueue[Runnable], threadFactory) threadPool.allowCoreThreadTimeOut(true) threadPool } {code} But some times, if the Thead object do not GC quickly it may caused server(driver) OOM. Below is an example: -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26568) Too many partitions may cause thriftServer frequently Full GC
[ https://issues.apache.org/jira/browse/SPARK-26568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-26568: - Description: The reason is that: first we have a table with many partitions(may be several hundred);second, we have some concurrent queries.Then the long-running thriftServer may encounter OOM issue. Here is a case: call stack of OOM thread: {code:java} pool-34-thread-10 at org.apache.hadoop.hive.metastore.api.StorageDescriptor.(Lorg/apache/hadoop/hive/metastore/api/StorageDescriptor;)V (StorageDescriptor.java:240) at org.apache.hadoop.hive.metastore.api.Partition.(Lorg/apache/hadoop/hive/metastore/api/Partition;)V (Partition.java:216) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.deepCopy(Lorg/apache/hadoop/hive/metastore/api/Partition;)Lorg/apache/hadoop/hive/metastore/api/Partition; (HiveMetaStoreClient.java:1343) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.deepCopyPartitions(Ljava/util/Collection;Ljava/util/List;)Ljava/util/List; (HiveMetaStoreClient.java:1409) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.deepCopyPartitions(Ljava/util/List;)Ljava/util/List; (HiveMetaStoreClient.java:1397) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.listPartitionsByFilter(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;S)Ljava/util/List; (HiveMetaStoreClient.java:914) at sun.reflect.GeneratedMethodAccessor98.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object; (Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object; (DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object; (Method.java:606) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(Ljava/lang/Object;Ljava/lang/reflect/Method;[Ljava/lang/Object;)Ljava/lang/Object; (RetryingMetaStoreClient.java:90) at com.sun.proxy.$Proxy30.listPartitionsByFilter(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;S)Ljava/util/List; (Unknown Source) at org.apache.hadoop.hive.ql.metadata.Hive.getPartitionsByFilter(Lorg/apache/hadoop/hive/ql/metadata/Table;Ljava/lang/String;)Ljava/util/List; (Hive.java:1967) at sun.reflect.GeneratedMethodAccessor97.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object; (Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object; (DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object; (Method.java:606) at org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(Lorg/apache/hadoop/hive/ql/metadata/Hive;Lorg/apache/hadoop/hive/ql/metadata/Table;Lscala/collection/Seq;)Lscala/collection/Seq; (HiveShim.scala:602) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitionsByFilter$1.apply()Lscala/collection/Seq; (HiveClientImpl.scala:608) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitionsByFilter$1.apply()Ljava/lang/Object; (HiveClientImpl.scala:606) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply()Ljava/lang/Object; (HiveClientImpl.scala:321) at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(Lscala/Function0;Lscala/runtime/IntRef;Lscala/runtime/ObjectRef;Ljava/lang/Object;)V (HiveClientImpl.scala:264) at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(Lscala/Function0;)Ljava/lang/Object; (HiveClientImpl.scala:263) at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(Lscala/Function0;)Ljava/lang/Object; (HiveClientImpl.scala:307) at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionsByFilter(Lorg/apache/spark/sql/catalyst/catalog/CatalogTable;Lscala/collection/Seq;)Lscala/collection/Seq; (HiveClientImpl.scala:606) at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$listPartitionsByFilter$1.apply()Lscala/collection/Seq; (HiveExternalCatalog.scala:1017) at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$listPartitionsByFilter$1.apply()Ljava/lang/Object; (HiveExternalCatalog.scala:1000) at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(Lscala/Function0;)Ljava/lang/Object; (HiveExternalCatalog.scala:100) at org.apache.spark.sql.hive.HiveExternalCatalog.listPartitionsByFilter(Ljava/lang/String;Ljava/lang/String;Lscala/collection/Seq;)Lscala/collection/Seq; (HiveExternalCatalog.scala:1000) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionsByFilter(Lorg/apache/spark/sql/catalyst/TableIdentifier;Lscala/collection/Seq;)Lscala/collection/Seq; (SessionCatalog.scala:803) at
[jira] [Created] (SPARK-26568) Too many partitions may cause thriftServer frequently Full GC
zhoukang created SPARK-26568: Summary: Too many partitions may cause thriftServer frequently Full GC Key: SPARK-26568 URL: https://issues.apache.org/jira/browse/SPARK-26568 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0 Reporter: zhoukang The reason is that: first we have a table with many partitions(may be several hundred);second, we have some concurrent queries.Then the long-running thriftServer may encounter OOM issue. Here is a case: call stack of OOM thread: {code:java} pool-34-thread-10 at org.apache.hadoop.hive.metastore.api.StorageDescriptor.(Lorg/apache/hadoop/hive/metastore/api/StorageDescriptor;)V (StorageDescriptor.java:240) at org.apache.hadoop.hive.metastore.api.Partition.(Lorg/apache/hadoop/hive/metastore/api/Partition;)V (Partition.java:216) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.deepCopy(Lorg/apache/hadoop/hive/metastore/api/Partition;)Lorg/apache/hadoop/hive/metastore/api/Partition; (HiveMetaStoreClient.java:1343) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.deepCopyPartitions(Ljava/util/Collection;Ljava/util/List;)Ljava/util/List; (HiveMetaStoreClient.java:1409) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.deepCopyPartitions(Ljava/util/List;)Ljava/util/List; (HiveMetaStoreClient.java:1397) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.listPartitionsByFilter(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;S)Ljava/util/List; (HiveMetaStoreClient.java:914) at sun.reflect.GeneratedMethodAccessor98.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object; (Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object; (DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object; (Method.java:606) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(Ljava/lang/Object;Ljava/lang/reflect/Method;[Ljava/lang/Object;)Ljava/lang/Object; (RetryingMetaStoreClient.java:90) at com.sun.proxy.$Proxy30.listPartitionsByFilter(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;S)Ljava/util/List; (Unknown Source) at org.apache.hadoop.hive.ql.metadata.Hive.getPartitionsByFilter(Lorg/apache/hadoop/hive/ql/metadata/Table;Ljava/lang/String;)Ljava/util/List; (Hive.java:1967) at sun.reflect.GeneratedMethodAccessor97.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object; (Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object; (DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object; (Method.java:606) at org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(Lorg/apache/hadoop/hive/ql/metadata/Hive;Lorg/apache/hadoop/hive/ql/metadata/Table;Lscala/collection/Seq;)Lscala/collection/Seq; (HiveShim.scala:602) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitionsByFilter$1.apply()Lscala/collection/Seq; (HiveClientImpl.scala:608) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitionsByFilter$1.apply()Ljava/lang/Object; (HiveClientImpl.scala:606) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply()Ljava/lang/Object; (HiveClientImpl.scala:321) at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(Lscala/Function0;Lscala/runtime/IntRef;Lscala/runtime/ObjectRef;Ljava/lang/Object;)V (HiveClientImpl.scala:264) at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(Lscala/Function0;)Ljava/lang/Object; (HiveClientImpl.scala:263) at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(Lscala/Function0;)Ljava/lang/Object; (HiveClientImpl.scala:307) at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionsByFilter(Lorg/apache/spark/sql/catalyst/catalog/CatalogTable;Lscala/collection/Seq;)Lscala/collection/Seq; (HiveClientImpl.scala:606) at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$listPartitionsByFilter$1.apply()Lscala/collection/Seq; (HiveExternalCatalog.scala:1017) at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$listPartitionsByFilter$1.apply()Ljava/lang/Object; (HiveExternalCatalog.scala:1000) at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(Lscala/Function0;)Ljava/lang/Object; (HiveExternalCatalog.scala:100) at org.apache.spark.sql.hive.HiveExternalCatalog.listPartitionsByFilter(Ljava/lang/String;Ljava/lang/String;Lscala/collection/Seq;)Lscala/collection/Seq; (HiveExternalCatalog.scala:1000) at
[jira] [Created] (SPARK-26533) Support query auto cancel on thriftserver
zhoukang created SPARK-26533: Summary: Support query auto cancel on thriftserver Key: SPARK-26533 URL: https://issues.apache.org/jira/browse/SPARK-26533 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0 Reporter: zhoukang Support query auto cancelling when running too long on thriftserver. For some cases,we use thriftserver as long-running applications. Some times we want all the query need not to run more than given time. In these cases,we can enable auto cancel for time-consumed query.Which can let us release resources for other queries to run. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26514) Introduce a new conf to improve the task parallelism
zhoukang created SPARK-26514: Summary: Introduce a new conf to improve the task parallelism Key: SPARK-26514 URL: https://issues.apache.org/jira/browse/SPARK-26514 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.4.0, 2.1.0 Reporter: zhoukang Currently, we have a conf below. {code:java} private[spark] val CPUS_PER_TASK = ConfigBuilder("spark.task.cpus").intConf.createWithDefault(1) {code} For some applications which are not cpu-intensive,we may want to let one cpu to run more than one task. Like: {code:java} private[spark] val TASKS_PER_CPU = ConfigBuilder("spark.cpu.tasks").intConf.createWithDefault(1) {code} Which can improve performance for some applications and can also improve resource utilization -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24687) When NoClassDefError thrown during task serialization will cause job hang
[ https://issues.apache.org/jira/browse/SPARK-24687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-24687: - Description: When below exception thrown: {code:java} Exception in thread "dag-scheduler-event-loop" java.lang.NoClassDefFoundError: Lcom/xxx/data/recommend/aggregator/queue/QueueName; at java.lang.Class.getDeclaredFields0(Native Method) at java.lang.Class.privateGetDeclaredFields(Class.java:2436) at java.lang.Class.getDeclaredField(Class.java:1946) at java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659) at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468) at java.security.AccessController.doPrivileged(Native Method) at java.io.ObjectStreamClass.(ObjectStreamClass.java:468) at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365) at java.io.ObjectOutputStream.writeClass(ObjectOutputStream.java:1212) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1119) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) {code} Stage will always hang.Not abort. !hanging-960.png! It is because NoClassDefError will no be catch up below. {code} var taskBinary: Broadcast[Array[Byte]] = null try { // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). // For ResultTask, serialize and broadcast (rdd, func). val taskBinaryBytes: Array[Byte] = stage match { case stage: ShuffleMapStage => JavaUtils.bufferToArray( closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) } taskBinary =
[jira] [Created] (SPARK-24687) When NoClassDefError thrown during task serialization will cause job hang
zhoukang created SPARK-24687: Summary: When NoClassDefError thrown during task serialization will cause job hang Key: SPARK-24687 URL: https://issues.apache.org/jira/browse/SPARK-24687 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.1.1, 2.1.0 Reporter: zhoukang Attachments: hanging-960.png When below exception thrown: {code:java} Exception in thread "dag-scheduler-event-loop" java.lang.NoClassDefFoundError: Lcom/xxx/data/recommend/aggregator/queue/QueueName; at java.lang.Class.getDeclaredFields0(Native Method) at java.lang.Class.privateGetDeclaredFields(Class.java:2436) at java.lang.Class.getDeclaredField(Class.java:1946) at java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659) at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468) at java.security.AccessController.doPrivileged(Native Method) at java.io.ObjectStreamClass.(ObjectStreamClass.java:468) at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365) at java.io.ObjectOutputStream.writeClass(ObjectOutputStream.java:1212) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1119) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) {code} Stage will always hang.Not abort. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24687) When NoClassDefError thrown during task serialization will cause job hang
[ https://issues.apache.org/jira/browse/SPARK-24687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-24687: - Description: When below exception thrown: {code:java} Exception in thread "dag-scheduler-event-loop" java.lang.NoClassDefFoundError: Lcom/xxx/data/recommend/aggregator/queue/QueueName; at java.lang.Class.getDeclaredFields0(Native Method) at java.lang.Class.privateGetDeclaredFields(Class.java:2436) at java.lang.Class.getDeclaredField(Class.java:1946) at java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659) at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468) at java.security.AccessController.doPrivileged(Native Method) at java.io.ObjectStreamClass.(ObjectStreamClass.java:468) at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365) at java.io.ObjectOutputStream.writeClass(ObjectOutputStream.java:1212) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1119) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) {code} Stage will always hang.Not abort. !hanging-960.png! was: When below exception thrown: {code:java} Exception in thread "dag-scheduler-event-loop" java.lang.NoClassDefFoundError: Lcom/xxx/data/recommend/aggregator/queue/QueueName; at java.lang.Class.getDeclaredFields0(Native Method) at java.lang.Class.privateGetDeclaredFields(Class.java:2436) at java.lang.Class.getDeclaredField(Class.java:1946) at java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659) at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480) at
[jira] [Updated] (SPARK-24687) When NoClassDefError thrown during task serialization will cause job hang
[ https://issues.apache.org/jira/browse/SPARK-24687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-24687: - Attachment: hanging-960.png > When NoClassDefError thrown during task serialization will cause job hang > - > > Key: SPARK-24687 > URL: https://issues.apache.org/jira/browse/SPARK-24687 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0, 2.1.1 >Reporter: zhoukang >Priority: Major > Attachments: hanging-960.png > > > When below exception thrown: > {code:java} > Exception in thread "dag-scheduler-event-loop" > java.lang.NoClassDefFoundError: > Lcom/xxx/data/recommend/aggregator/queue/QueueName; > at java.lang.Class.getDeclaredFields0(Native Method) > at java.lang.Class.privateGetDeclaredFields(Class.java:2436) > at java.lang.Class.getDeclaredField(Class.java:1946) > at > java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659) > at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72) > at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480) > at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468) > at java.security.AccessController.doPrivileged(Native Method) > at java.io.ObjectStreamClass.(ObjectStreamClass.java:468) > at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365) > at java.io.ObjectOutputStream.writeClass(ObjectOutputStream.java:1212) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1119) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > {code} > Stage will always hang.Not abort. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (SPARK-24544) Print actual failure cause when look up function failed
zhoukang created SPARK-24544: Summary: Print actual failure cause when look up function failed Key: SPARK-24544 URL: https://issues.apache.org/jira/browse/SPARK-24544 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.3.1, 2.1.0 Reporter: zhoukang When we operate as below: {code} 0: jdbc:hive2://xxx/> create function funnel_analysis as 'com.xxx.hive.extend.udf.UapFunnelAnalysis'; 0: jdbc:hive2://xxx/> use mifi; +-+--+ | Result | +-+--+ +-+--+ 0: jdbc:hive2://xxx/> select funnel_analysis(1,",",1,''); Error: org.apache.spark.sql.AnalysisException: Undefined function: 'funnel_analysis'. This function is neither a registered temporary function nor a permanent function registered in the database 'xxx'.; line 1 pos 7 (state=,code=0) 0: jdbc:hive2://xxx/> describe function funnel_analysis; +---+--+ | function_desc | +---+--+ | Function: mifi.funnel_analysis| | Class: com.xxx.hive.extend.udf.UapFunnelAnalysis | | Usage: N/A. | +---+--+ {code} We can see describe funtion will get right information,but when we actually use this funtion,we will get an undefined exception. Which is really misleading,the real cause is below: {code} 2018-06-13,17:15:16,725 INFO org.apache.spark.sql.hive.KuduHiveSessionCatalog: Error: org.apache.spark.sql.AnalysisException: No handler for Hive UDF 'com.xiaomi.mifi.hive.extend.udf.UapFunnelAnalysis': java.lang.IllegalStateException: Should not be called directly; at org.apache.hadoop.hive.ql.udf.generic.GenericUDTF.initialize(GenericUDTF.java:72) at org.apache.spark.sql.hive.HiveGenericUDTF.outputInspector$lzycompute(hiveUDFs.scala:204) at org.apache.spark.sql.hive.HiveGenericUDTF.outputInspector(hiveUDFs.scala:204) at org.apache.spark.sql.hive.HiveGenericUDTF.elementSchema$lzycompute(hiveUDFs.scala:212) at org.apache.spark.sql.hive.HiveGenericUDTF.elementSchema(hiveUDFs.scala:212) at org.apache.spark.sql.hive.HiveSessionCatalog$$anonfun$makeFunctionBuilder$1.apply(HiveSessionCatalog.scala:146) at org.apache.spark.sql.hive.HiveSessionCatalog$$anonfun$makeFunctionBuilder$1.apply(HiveSessionCatalog.scala:122) at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:94) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupFunction(SessionCatalog.scala:1060) at org.apache.spark.sql.hive.HiveSessionCatalog.org$apache$spark$sql$hive$HiveSessionCatalog$$super$lookupFunction(HiveSessionCatalog.scala:192) at org.apache.spark.sql.hive.HiveSessionCatalog$$anonfun$6.apply(HiveSessionCatalog.scala:192) at org.apache.spark.sql.hive.HiveSessionCatalog$$anonfun$6.apply(HiveSessionCatalog.scala:192) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.sql.hive.HiveSessionCatalog.lookupFunction0(HiveSessionCatalog.scala:192) at org.apache.spark.sql.hive.HiveSessionCatalog.lookupFunction(HiveSessionCatalog.scala:177) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$13$$anonfun$applyOrElse$6$$anonfun$applyOrElse$39.apply(Analyzer.scala:900) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$13$$anonfun$applyOrElse$6$$anonfun$applyOrElse$39.apply(Analyzer.scala:900) at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$13$$anonfun$applyOrElse$6.applyOrElse(Analyzer.scala:899) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$13$$anonfun$applyOrElse$6.applyOrElse(Analyzer.scala:887) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273) {code} I think we should print this actual error for quick debugging. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail:
[jira] [Updated] (SPARK-24515) No need to warning user when output commit coordination enabled
[ https://issues.apache.org/jira/browse/SPARK-24515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-24515: - Description: No need to warning user when output commit coordination enabled {code:java} // When speculation is on and output committer class name contains "Direct", we should warn // users that they may loss data if they are using a direct output committer. val speculationEnabled = self.conf.getBoolean("spark.speculation", false) val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "") if (speculationEnabled && outputCommitterClass.contains("Direct")) { val warningMessage = s"$outputCommitterClass may be an output committer that writes data directly to " + "the final location. Because speculation is enabled, this output committer may " + "cause data loss (see the case in SPARK-10063). If possible, please use an output " + "committer that does not have this behavior (e.g. FileOutputCommitter)." logWarning(warningMessage) } {code} was: {code:java} {code} No need to warning user when output commit coordination enabled {code} // When speculation is on and output committer class name contains "Direct", we should warn // users that they may loss data if they are using a direct output committer. val speculationEnabled = self.conf.getBoolean("spark.speculation", false) val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "") if (speculationEnabled && outputCommitterClass.contains("Direct")) { val warningMessage = s"$outputCommitterClass may be an output committer that writes data directly to " + "the final location. Because speculation is enabled, this output committer may " + "cause data loss (see the case in SPARK-10063). If possible, please use an output " + "committer that does not have this behavior (e.g. FileOutputCommitter)." logWarning(warningMessage) } {code} {code:java} // code placeholder {code} > No need to warning user when output commit coordination enabled > --- > > Key: SPARK-24515 > URL: https://issues.apache.org/jira/browse/SPARK-24515 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.3.1 >Reporter: zhoukang >Priority: Major > > > No need to warning user when output commit coordination enabled > {code:java} > // When speculation is on and output committer class name contains "Direct", > we should warn > // users that they may loss data if they are using a direct output committer. > val speculationEnabled = self.conf.getBoolean("spark.speculation", false) > val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "") > if (speculationEnabled && outputCommitterClass.contains("Direct")) { > val warningMessage = > s"$outputCommitterClass may be an output committer that writes data directly > to " + > "the final location. Because speculation is enabled, this output committer > may " + > "cause data loss (see the case in SPARK-10063). If possible, please use an > output " + > "committer that does not have this behavior (e.g. FileOutputCommitter)." > logWarning(warningMessage) > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24515) No need to warning user when output commit coordination enabled
[ https://issues.apache.org/jira/browse/SPARK-24515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-24515: - Description: {code:java} {code} No need to warning user when output commit coordination enabled {code} // When speculation is on and output committer class name contains "Direct", we should warn // users that they may loss data if they are using a direct output committer. val speculationEnabled = self.conf.getBoolean("spark.speculation", false) val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "") if (speculationEnabled && outputCommitterClass.contains("Direct")) { val warningMessage = s"$outputCommitterClass may be an output committer that writes data directly to " + "the final location. Because speculation is enabled, this output committer may " + "cause data loss (see the case in SPARK-10063). If possible, please use an output " + "committer that does not have this behavior (e.g. FileOutputCommitter)." logWarning(warningMessage) } {code} {code:java} // code placeholder {code} was: {code:java} // When speculation is on and output committer class name contains "Direct", we should warn // users that they may loss data if they are using a direct output committer. val speculationEnabled = self.conf.getBoolean("spark.speculation", false) val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "") if (speculationEnabled && outputCommitterClass.contains("Direct")) { val warningMessage = s"$outputCommitterClass may be an output committer that writes data directly to " + "the final location. Because speculation is enabled, this output committer may " + "cause data loss (see the case in SPARK-10063). If possible, please use an output " + "committer that does not have this behavior (e.g. FileOutputCommitter)." logWarning(warningMessage) } {code} No need to warning user when output commit coordination enabled > No need to warning user when output commit coordination enabled > --- > > Key: SPARK-24515 > URL: https://issues.apache.org/jira/browse/SPARK-24515 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.3.1 >Reporter: zhoukang >Priority: Major > > > {code:java} > {code} > No need to warning user when output commit coordination enabled > {code} > // When speculation is on and output committer class name contains "Direct", > we should warn > // users that they may loss data if they are using a direct output committer. > val speculationEnabled = self.conf.getBoolean("spark.speculation", false) > val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "") > if (speculationEnabled && outputCommitterClass.contains("Direct")) { > val warningMessage = > s"$outputCommitterClass may be an output committer that writes data directly > to " + > "the final location. Because speculation is enabled, this output committer > may " + > "cause data loss (see the case in SPARK-10063). If possible, please use an > output " + > "committer that does not have this behavior (e.g. FileOutputCommitter)." > logWarning(warningMessage) > } > {code} > {code:java} > // code placeholder > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24515) No need to warning user when output commit coordination enabled
zhoukang created SPARK-24515: Summary: No need to warning user when output commit coordination enabled Key: SPARK-24515 URL: https://issues.apache.org/jira/browse/SPARK-24515 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.3.1 Reporter: zhoukang {code:java} // When speculation is on and output committer class name contains "Direct", we should warn // users that they may loss data if they are using a direct output committer. val speculationEnabled = self.conf.getBoolean("spark.speculation", false) val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "") if (speculationEnabled && outputCommitterClass.contains("Direct")) { val warningMessage = s"$outputCommitterClass may be an output committer that writes data directly to " + "the final location. Because speculation is enabled, this output committer may " + "cause data loss (see the case in SPARK-10063). If possible, please use an output " + "committer that does not have this behavior (e.g. FileOutputCommitter)." logWarning(warningMessage) } {code} No need to warning user when output commit coordination enabled -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24440) When use constant as column we may get wrong answer versus impala
zhoukang created SPARK-24440: Summary: When use constant as column we may get wrong answer versus impala Key: SPARK-24440 URL: https://issues.apache.org/jira/browse/SPARK-24440 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.0, 2.1.0 Reporter: zhoukang For query below: {code:java} select `date`, 100 as platform, count(distinct deviceid) as new_user from tv.clean_new_user where `date`=20180528 group by `date`, platform {code} We intended to group by 100 and get distinct deviceid number. By spark sql,we get: {code} +---+---+---+--+ | date| platform | new_user | +---+---+---+--+ | 20180528 | 100 | 521 | | 20180528 | 100 | 82| | 20180528 | 100 | 3 | | 20180528 | 100 | 2 | | 20180528 | 100 | 7 | | 20180528 | 100 | 870 | | 20180528 | 100 | 3 | | 20180528 | 100 | 8 | | 20180528 | 100 | 3 | | 20180528 | 100 | 2204 | | 20180528 | 100 | 1123 | | 20180528 | 100 | 1 | | 20180528 | 100 | 54| | 20180528 | 100 | 440 | | 20180528 | 100 | 4 | | 20180528 | 100 | 478 | | 20180528 | 100 | 34| | 20180528 | 100 | 195 | | 20180528 | 100 | 17| | 20180528 | 100 | 18| | 20180528 | 100 | 2 | | 20180528 | 100 | 2 | | 20180528 | 100 | 84| | 20180528 | 100 | 1616 | | 20180528 | 100 | 15| | 20180528 | 100 | 7 | | 20180528 | 100 | 479 | | 20180528 | 100 | 50| | 20180528 | 100 | 376 | | 20180528 | 100 | 21| | 20180528 | 100 | 842 | | 20180528 | 100 | 444 | | 20180528 | 100 | 538 | | 20180528 | 100 | 1 | | 20180528 | 100 | 2 | | 20180528 | 100 | 7 | | 20180528 | 100 | 17| | 20180528 | 100 | 133 | | 20180528 | 100 | 7 | | 20180528 | 100 | 415 | | 20180528 | 100 | 2 | | 20180528 | 100 | 318 | | 20180528 | 100 | 5 | | 20180528 | 100 | 1 | | 20180528 | 100 | 2060 | | 20180528 | 100 | 1217 | | 20180528 | 100 | 2 | | 20180528 | 100 | 60| | 20180528 | 100 | 22| | 20180528 | 100 | 4 | +---+---+---+--+ {code} Actually sum of the deviceid is below: {code} 0: jdbc:hive2://xxx/> select sum(t1.new_user) from (select `date`, 100 as platform, count(distinct deviceid) as new_user from tv.clean_new_user where `date`=20180528 group by `date`, platform)t1; ++--+ | sum(new_user) | ++--+ | 14816 | ++--+ 1 row selected (4.934 seconds) {code} And the real distinct deviceid value is below: {code} 0: jdbc:hive2://xxx/> select 100 as platform, count(distinct deviceid) as new_user from tv.clean_new_user where `date`=20180528; +---+---+--+ | platform | new_user | +---+---+--+ | 100 | 14773 | +---+---+--+ 1 row selected (2.846 seconds) {code} In impala,with the first query we can get result below: {code} [xxx] > select `date`, 100 as platform, count(distinct deviceid) as new_user from tv.clean_new_user where `date`=20180528 group by `date`, platform;Query: select `date`, 100 as platform, count(distinct deviceid) as new_user from tv.clean_new_user where `date`=20180528 group by `date`, platform +--+--+--+ | date | platform | new_user | +--+--+--+ | 20180528 | 100 | 14773| +--+--+--+ Fetched 1 row(s) in 1.00s {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24084) Add job group id for query through spark-sql
[ https://issues.apache.org/jira/browse/SPARK-24084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-24084: - Description: For spark-sql we can add job group id for the same statement. (was: For thrift server we can add job group id for the same statement.Like below:) > Add job group id for query through spark-sql > > > Key: SPARK-24084 > URL: https://issues.apache.org/jira/browse/SPARK-24084 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: zhoukang >Priority: Major > > For spark-sql we can add job group id for the same statement. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24084) Add job group id for query through spark-sql
[ https://issues.apache.org/jira/browse/SPARK-24084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-24084: - Summary: Add job group id for query through spark-sql (was: Add job group id for query through Thrift Server) > Add job group id for query through spark-sql > > > Key: SPARK-24084 > URL: https://issues.apache.org/jira/browse/SPARK-24084 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: zhoukang >Priority: Major > > For thrift server we can add job group id for the same statement.Like below: -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24084) Add job group id for query through Thrift Server
zhoukang created SPARK-24084: Summary: Add job group id for query through Thrift Server Key: SPARK-24084 URL: https://issues.apache.org/jira/browse/SPARK-24084 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Reporter: zhoukang For thrift server we can add job group id for the same statement.Like below: -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24083) Diagnostics message for uncaught exceptions should include the stacktrace
zhoukang created SPARK-24083: Summary: Diagnostics message for uncaught exceptions should include the stacktrace Key: SPARK-24083 URL: https://issues.apache.org/jira/browse/SPARK-24083 Project: Spark Issue Type: Improvement Components: YARN Affects Versions: 2.3.0 Reporter: zhoukang Like [SPARK-23296|https://issues.apache.org/jira/browse/SPARK-23296]. For uncaught exceptions, we should also print the stacktrace. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24059) When blacklist disable always hash to a bad local directory may cause job failure
[ https://issues.apache.org/jira/browse/SPARK-24059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449254#comment-16449254 ] zhoukang commented on SPARK-24059: -- When we set multiple local dir on different disk. Retry will make sense. [~srowen] > When blacklist disable always hash to a bad local directory may cause job > failure > - > > Key: SPARK-24059 > URL: https://issues.apache.org/jira/browse/SPARK-24059 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: zhoukang >Priority: Major > > When blacklist disable , if we always hashed temp shuffle to a bad local > directory on the same executor will cause job failure. > Like below: > {code:java} > java.io.FileNotFoundException: > /home/work/hdd8/yarn/xxx/nodemanager/usercache/xxx/appcache/application_1520502842490_20813/blockmgr-3beeddbf-bb83-4a74-ad7a-e796fe592b7c/27/temp_shuffle_159e6886-b76f-4d96-9600-aee62ada0fa9 > (Read-only file system) > java.io.FileNotFoundException: > /home/work/hdd8/yarn/xxx/nodemanager/usercache/xxx/appcache/application_1520502842490_20813/blockmgr-3beeddbf-bb83-4a74-ad7a-e796fe592b7c/06/temp_shuffle_ba7f0a29-8e02-4ffa-94f7-01f72d214821 > (Read-only file system) > java.io.FileNotFoundException: > /home/work/hdd8/yarn/xxx/nodemanager/usercache/xxx/appcache/application_1520502842490_20813/blockmgr-3beeddbf-bb83-4a74-ad7a-e796fe592b7c/32/temp_shuffle_7030256c-fc24-4d45-a901-be23c2c3fbd6 > (Read-only file system) > java.io.FileNotFoundException: > /home/work/hdd8/yarn/zjyprc-hadoop/nodemanager/usercache/h_message_push/appcache/application_1520502842490_20813/blockmgr-3beeddbf-bb83-4a74-ad7a-e796fe592b7c/14/temp_shuffle_65816622-6217-43b9-bc9e-e2f67dc9a9de > (Read-only file system) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24059) When blacklist disable always hash to a bad local directory may cause job failure
[ https://issues.apache.org/jira/browse/SPARK-24059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449175#comment-16449175 ] zhoukang commented on SPARK-24059: -- cc [~vanzin] [~srowen] If we add retry make sense? I will work on this if this fix make sense! Thanks > When blacklist disable always hash to a bad local directory may cause job > failure > - > > Key: SPARK-24059 > URL: https://issues.apache.org/jira/browse/SPARK-24059 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: zhoukang >Priority: Major > > When blacklist disable , if we always hashed temp shuffle to a bad local > directory on the same executor will cause job failure. > Like below: > {code:java} > java.io.FileNotFoundException: > /home/work/hdd8/yarn/xxx/nodemanager/usercache/xxx/appcache/application_1520502842490_20813/blockmgr-3beeddbf-bb83-4a74-ad7a-e796fe592b7c/27/temp_shuffle_159e6886-b76f-4d96-9600-aee62ada0fa9 > (Read-only file system) > java.io.FileNotFoundException: > /home/work/hdd8/yarn/xxx/nodemanager/usercache/xxx/appcache/application_1520502842490_20813/blockmgr-3beeddbf-bb83-4a74-ad7a-e796fe592b7c/06/temp_shuffle_ba7f0a29-8e02-4ffa-94f7-01f72d214821 > (Read-only file system) > java.io.FileNotFoundException: > /home/work/hdd8/yarn/xxx/nodemanager/usercache/xxx/appcache/application_1520502842490_20813/blockmgr-3beeddbf-bb83-4a74-ad7a-e796fe592b7c/32/temp_shuffle_7030256c-fc24-4d45-a901-be23c2c3fbd6 > (Read-only file system) > java.io.FileNotFoundException: > /home/work/hdd8/yarn/zjyprc-hadoop/nodemanager/usercache/h_message_push/appcache/application_1520502842490_20813/blockmgr-3beeddbf-bb83-4a74-ad7a-e796fe592b7c/14/temp_shuffle_65816622-6217-43b9-bc9e-e2f67dc9a9de > (Read-only file system) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24059) When blacklist disable always hash to a bad local directory may cause job failure
zhoukang created SPARK-24059: Summary: When blacklist disable always hash to a bad local directory may cause job failure Key: SPARK-24059 URL: https://issues.apache.org/jira/browse/SPARK-24059 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.3.0 Reporter: zhoukang When blacklist disable , if we always hashed temp shuffle to a bad local directory on the same executor will cause job failure. Like below: {code:java} java.io.FileNotFoundException: /home/work/hdd8/yarn/xxx/nodemanager/usercache/xxx/appcache/application_1520502842490_20813/blockmgr-3beeddbf-bb83-4a74-ad7a-e796fe592b7c/27/temp_shuffle_159e6886-b76f-4d96-9600-aee62ada0fa9 (Read-only file system) java.io.FileNotFoundException: /home/work/hdd8/yarn/xxx/nodemanager/usercache/xxx/appcache/application_1520502842490_20813/blockmgr-3beeddbf-bb83-4a74-ad7a-e796fe592b7c/06/temp_shuffle_ba7f0a29-8e02-4ffa-94f7-01f72d214821 (Read-only file system) java.io.FileNotFoundException: /home/work/hdd8/yarn/xxx/nodemanager/usercache/xxx/appcache/application_1520502842490_20813/blockmgr-3beeddbf-bb83-4a74-ad7a-e796fe592b7c/32/temp_shuffle_7030256c-fc24-4d45-a901-be23c2c3fbd6 (Read-only file system) java.io.FileNotFoundException: /home/work/hdd8/yarn/zjyprc-hadoop/nodemanager/usercache/h_message_push/appcache/application_1520502842490_20813/blockmgr-3beeddbf-bb83-4a74-ad7a-e796fe592b7c/14/temp_shuffle_65816622-6217-43b9-bc9e-e2f67dc9a9de (Read-only file system) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24053) Support add subdirectory named as user name on staging directory
zhoukang created SPARK-24053: Summary: Support add subdirectory named as user name on staging directory Key: SPARK-24053 URL: https://issues.apache.org/jira/browse/SPARK-24053 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.3.0 Reporter: zhoukang When we have multiple users on the same cluster,we can support add subdirectory which named as login user -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24052) Support spark version showing on environment page
[ https://issues.apache.org/jira/browse/SPARK-24052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447917#comment-16447917 ] zhoukang commented on SPARK-24052: -- [~jiangxb] Cloud you help review this?Thanks too much! > Support spark version showing on environment page > - > > Key: SPARK-24052 > URL: https://issues.apache.org/jira/browse/SPARK-24052 > Project: Spark > Issue Type: Improvement > Components: Web UI >Affects Versions: 2.3.0 >Reporter: zhoukang >Priority: Major > Attachments: environment-page.png > > > Since we may have multiple version in production cluster,we can showing some > information on environment page like below: > !environment-page.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24052) Support spark version showing on environment page
[ https://issues.apache.org/jira/browse/SPARK-24052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-24052: - Description: Since we may have multiple version in production cluster,we can showing some information on environment page like below: !environment-page.png! > Support spark version showing on environment page > - > > Key: SPARK-24052 > URL: https://issues.apache.org/jira/browse/SPARK-24052 > Project: Spark > Issue Type: Improvement > Components: Web UI >Affects Versions: 2.3.0 >Reporter: zhoukang >Priority: Major > Attachments: environment-page.png > > > Since we may have multiple version in production cluster,we can showing some > information on environment page like below: > !environment-page.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24052) Support spark version showing on environment page
[ https://issues.apache.org/jira/browse/SPARK-24052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-24052: - Attachment: environment-page.png > Support spark version showing on environment page > - > > Key: SPARK-24052 > URL: https://issues.apache.org/jira/browse/SPARK-24052 > Project: Spark > Issue Type: Improvement > Components: Web UI >Affects Versions: 2.3.0 >Reporter: zhoukang >Priority: Major > Attachments: environment-page.png > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24052) Support spark version showing on environment page
zhoukang created SPARK-24052: Summary: Support spark version showing on environment page Key: SPARK-24052 URL: https://issues.apache.org/jira/browse/SPARK-24052 Project: Spark Issue Type: Improvement Components: Web UI Affects Versions: 2.3.0 Reporter: zhoukang -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-23692) Print metadata of files when infer schema failed
[ https://issues.apache.org/jira/browse/SPARK-23692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang resolved SPARK-23692. -- Resolution: Won't Fix > Print metadata of files when infer schema failed > > > Key: SPARK-23692 > URL: https://issues.apache.org/jira/browse/SPARK-23692 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: zhoukang >Priority: Minor > > A trivial modify. > Currently, when we had no input files to infer schema,we will throw below > exception. > For some users it may be misleading.If we can print files' metadata it will > be more clearer. > {code:java} > Caused by: org.apache.spark.sql.AnalysisException: Unable to infer schema for > Parquet. It must be specified manually.; > at > org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:189) > at > org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:189) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$getOrInferFileFormatSchema(DataSource.scala:188) > at > org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387) > at > org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152) > at > org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:441) > at > org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:425) > at > com.xiaomi.matrix.pipeline.jobspark.importer.MatrixAdEventDailyImportJob.(MatrixAdEventDailyImportJob.scala:18) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-23665) Add adaptive algorithm to select query result collect method
[ https://issues.apache.org/jira/browse/SPARK-23665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang resolved SPARK-23665. -- Resolution: Won't Fix > Add adaptive algorithm to select query result collect method > > > Key: SPARK-23665 > URL: https://issues.apache.org/jira/browse/SPARK-23665 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.1, 2.3.0 >Reporter: zhoukang >Priority: Major > > Currently, we use configuration like > {code:java} > spark.sql.thriftServer.incrementalCollect > {code} > to specify query result collect method. > Actually,we can estimate the size of the result and select collect method > automatically. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23708) Comment of ShutdownHookManager.addShutdownHook is error
[ https://issues.apache.org/jira/browse/SPARK-23708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-23708: - Issue Type: Improvement (was: Bug) > Comment of ShutdownHookManager.addShutdownHook is error > --- > > Key: SPARK-23708 > URL: https://issues.apache.org/jira/browse/SPARK-23708 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: zhoukang >Priority: Minor > > Comment below is not right! > {code:java} > /** >* Adds a shutdown hook with the given priority. Hooks with lower priority > values run >* first. >* >* @param hook The code to run during shutdown. >* @return A handle that can be used to unregister the shutdown hook. >*/ > def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = { > shutdownHooks.add(priority, hook) > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23708) Comment of ShutdownHookManager.addShutdownHook is error
[ https://issues.apache.org/jira/browse/SPARK-23708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-23708: - Issue Type: Bug (was: Improvement) > Comment of ShutdownHookManager.addShutdownHook is error > --- > > Key: SPARK-23708 > URL: https://issues.apache.org/jira/browse/SPARK-23708 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: zhoukang >Priority: Minor > > Comment below is not right! > {code:java} > /** >* Adds a shutdown hook with the given priority. Hooks with lower priority > values run >* first. >* >* @param hook The code to run during shutdown. >* @return A handle that can be used to unregister the shutdown hook. >*/ > def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = { > shutdownHooks.add(priority, hook) > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23708) Comment of ShutdownHookManager.addShutdownHook is error
zhoukang created SPARK-23708: Summary: Comment of ShutdownHookManager.addShutdownHook is error Key: SPARK-23708 URL: https://issues.apache.org/jira/browse/SPARK-23708 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.3.0 Reporter: zhoukang Comment below is not right! {code:java} /** * Adds a shutdown hook with the given priority. Hooks with lower priority values run * first. * * @param hook The code to run during shutdown. * @return A handle that can be used to unregister the shutdown hook. */ def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = { shutdownHooks.add(priority, hook) } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23692) Print metadata of files when infer schema failed
zhoukang created SPARK-23692: Summary: Print metadata of files when infer schema failed Key: SPARK-23692 URL: https://issues.apache.org/jira/browse/SPARK-23692 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Reporter: zhoukang A trivial modify. Currently, when we had no input files to infer schema,we will throw below exception. For some users it may be misleading.If we can print files' metadata it will be more clearer. {code:java} Caused by: org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.; at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:189) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:189) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$getOrInferFileFormatSchema(DataSource.scala:188) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152) at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:441) at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:425) at com.xiaomi.matrix.pipeline.jobspark.importer.MatrixAdEventDailyImportJob.(MatrixAdEventDailyImportJob.scala:18) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23665) Add adaptive algorithm to select query result collect method
[ https://issues.apache.org/jira/browse/SPARK-23665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398706#comment-16398706 ] zhoukang commented on SPARK-23665: -- Add this logic so that users do not need to choose whether to set {code:java} spark.sql.thriftServer.incrementalCollect {code} or not. Does this make sense?Thanks! > Add adaptive algorithm to select query result collect method > > > Key: SPARK-23665 > URL: https://issues.apache.org/jira/browse/SPARK-23665 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.1, 2.3.0 >Reporter: zhoukang >Priority: Major > > Currently, we use configuration like > {code:java} > spark.sql.thriftServer.incrementalCollect > {code} > to specify query result collect method. > Actually,we can estimate the size of the result and select collect method > automatically. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23665) Add adaptive algorithm to select query result collect method
zhoukang created SPARK-23665: Summary: Add adaptive algorithm to select query result collect method Key: SPARK-23665 URL: https://issues.apache.org/jira/browse/SPARK-23665 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0, 2.1.1 Reporter: zhoukang Currently, we use configuration like {code:java} spark.sql.thriftServer.incrementalCollect {code} to specify query result collect method. Actually,we can estimate the size of the result and select collect method automatically. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23664) Add interface to collect query result through file iterator
zhoukang created SPARK-23664: Summary: Add interface to collect query result through file iterator Key: SPARK-23664 URL: https://issues.apache.org/jira/browse/SPARK-23664 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0, 2.1.1 Reporter: zhoukang Currently, we use spark sql through jdbc. Result may cost much memory since we collect result and cached in memory for performance consideration. However,we can also add an API to collect result through file iterator(like parquet file iterator),we can avoid OOM of thriftserver for big query. Like below: {code:java} result.toLocalIteratorThroughFile.asScala {code} I will work on this if make sense! And in our internal cluster we have used this API for about a year. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23513) java.io.IOException: Expected 12 fields, but got 5 for row :Spark submit error
[ https://issues.apache.org/jira/browse/SPARK-23513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391293#comment-16391293 ] zhoukang commented on SPARK-23513: -- Could you please post some more details?[~Fray] > java.io.IOException: Expected 12 fields, but got 5 for row :Spark submit > error > --- > > Key: SPARK-23513 > URL: https://issues.apache.org/jira/browse/SPARK-23513 > Project: Spark > Issue Type: Bug > Components: EC2, Examples, Input/Output, Java API >Affects Versions: 1.4.0, 2.2.0 >Reporter: Rawia >Priority: Blocker > > Hello > I'm trying to run a spark application (distributedWekaSpark) but when I'm > using the spark-submit command I get this error > {quote}{quote}ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) > java.io.IOException: Expected 12 fields, but got 5 for row: > outlook,temperature,humidity,windy,play > {quote}{quote} > I tried with other datasets but always the same error appeared, (always 12 > fields expected) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23549) Spark SQL unexpected behavior when comparing timestamp to date
[ https://issues.apache.org/jira/browse/SPARK-23549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391282#comment-16391282 ] zhoukang edited comment on SPARK-23549 at 3/8/18 2:07 PM: -- I think this is a bug.Which may caused by rule below:(I have not run any test,just guessing) {code:java} case p @ BinaryComparison(left, right) if findCommonTypeForBinaryComparison(left.dataType, right.dataType).isDefined => val commonType = findCommonTypeForBinaryComparison(left.dataType, right.dataType).get p.makeCopy(Array(castExpr(left, commonType), castExpr(right, commonType))) {code} findCommonTypeForBinaryComparison will return StringType: {code:java} case (DateType, TimestampType) => Some(StringType) {code} May be we can add a new rule for this case? was (Author: cane): I think this is a bug.Which may caused by rule below: {code:java} case p @ BinaryComparison(left, right) if findCommonTypeForBinaryComparison(left.dataType, right.dataType).isDefined => val commonType = findCommonTypeForBinaryComparison(left.dataType, right.dataType).get p.makeCopy(Array(castExpr(left, commonType), castExpr(right, commonType))) {code} findCommonTypeForBinaryComparison will return StringType: {code:java} case (DateType, TimestampType) => Some(StringType) {code} May be we can add a new rule for this case? > Spark SQL unexpected behavior when comparing timestamp to date > -- > > Key: SPARK-23549 > URL: https://issues.apache.org/jira/browse/SPARK-23549 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1 >Reporter: Dong Jiang >Priority: Major > > {code:java} > scala> spark.version > res1: String = 2.2.1 > scala> spark.sql("select cast('2017-03-01 00:00:00' as timestamp) between > cast('2017-02-28' as date) and cast('2017-03-01' as date)").show > +---+ > |((CAST(CAST(2017-03-01 00:00:00 AS TIMESTAMP) AS STRING) >= > CAST(CAST(2017-02-28 AS DATE) AS STRING)) AND (CAST(CAST(2017-03-01 00:00:00 > AS TIMESTAMP) AS STRING) <= CAST(CAST(2017-03-01 AS DATE) AS STRING)))| > +---+ > | > > false| > +---+{code} > As shown above, when a timestamp is compared to date in SparkSQL, both > timestamp and date are downcast to string, and leading to unexpected result. > If run the same SQL in presto/Athena, I got the expected result > {code:java} > select cast('2017-03-01 00:00:00' as timestamp) between cast('2017-02-28' as > date) and cast('2017-03-01' as date) > _col0 > 1 true > {code} > Is this a bug for Spark or a feature? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23549) Spark SQL unexpected behavior when comparing timestamp to date
[ https://issues.apache.org/jira/browse/SPARK-23549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391282#comment-16391282 ] zhoukang commented on SPARK-23549: -- I think this is a bug.Which may caused by rule below: {code:java} case p @ BinaryComparison(left, right) if findCommonTypeForBinaryComparison(left.dataType, right.dataType).isDefined => val commonType = findCommonTypeForBinaryComparison(left.dataType, right.dataType).get p.makeCopy(Array(castExpr(left, commonType), castExpr(right, commonType))) {code} findCommonTypeForBinaryComparison will return StringType: {code:java} case (DateType, TimestampType) => Some(StringType) {code} May be we can add a new rule for this case? > Spark SQL unexpected behavior when comparing timestamp to date > -- > > Key: SPARK-23549 > URL: https://issues.apache.org/jira/browse/SPARK-23549 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1 >Reporter: Dong Jiang >Priority: Major > > {code:java} > scala> spark.version > res1: String = 2.2.1 > scala> spark.sql("select cast('2017-03-01 00:00:00' as timestamp) between > cast('2017-02-28' as date) and cast('2017-03-01' as date)").show > +---+ > |((CAST(CAST(2017-03-01 00:00:00 AS TIMESTAMP) AS STRING) >= > CAST(CAST(2017-02-28 AS DATE) AS STRING)) AND (CAST(CAST(2017-03-01 00:00:00 > AS TIMESTAMP) AS STRING) <= CAST(CAST(2017-03-01 AS DATE) AS STRING)))| > +---+ > | > > false| > +---+{code} > As shown above, when a timestamp is compared to date in SparkSQL, both > timestamp and date are downcast to string, and leading to unexpected result. > If run the same SQL in presto/Athena, I got the expected result > {code:java} > select cast('2017-03-01 00:00:00' as timestamp) between cast('2017-02-28' as > date) and cast('2017-03-01' as date) > _col0 > 1 true > {code} > Is this a bug for Spark or a feature? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-23335) expression 'select conv('ffffffffffffffff',16,10) % 2' return 0
[ https://issues.apache.org/jira/browse/SPARK-23335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang resolved SPARK-23335. -- Resolution: Won't Fix > expression 'select conv('',16,10) % 2' return 0 > --- > > Key: SPARK-23335 > URL: https://issues.apache.org/jira/browse/SPARK-23335 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: zhoukang >Priority: Major > > For below expression: > {code:java} > select conv('',16,10) % 2; > {code} > it will return 0. > {code:java} > 0: jdbc:hive2://xxx:16> select conv('',16,10) % 2; > > +--+--+ > > | (CAST(conv(, 16, 10) AS DOUBLE) % CAST(CAST(2 AS > DECIMAL(20,0)) AS DOUBLE)) | > > +--+--+ > | 0.0 | > +--+--+ > {code} > It caused by: > {code:java} > case a @ BinaryArithmetic(left @ StringType(), right) => > a.makeCopy(Array(Cast(left, DoubleType), right)) > case a @ BinaryArithmetic(left, right @ StringType()) => > a.makeCopy(Array(left, Cast(right, DoubleType))) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23335) expression 'select conv('ffffffffffffffff',16,10) % 2' return 0
[ https://issues.apache.org/jira/browse/SPARK-23335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16381827#comment-16381827 ] zhoukang commented on SPARK-23335: -- Actually hive return the same result {code:java} 0: jdbc:hive2://c3-hadoop-hive01.bj:18203/> select conv('',16,10) % 2; +--+--+ | _c0 | +--+--+ | 0.0 | +--+--+ {code} > expression 'select conv('',16,10) % 2' return 0 > --- > > Key: SPARK-23335 > URL: https://issues.apache.org/jira/browse/SPARK-23335 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: zhoukang >Priority: Major > > For below expression: > {code:java} > select conv('',16,10) % 2; > {code} > it will return 0. > {code:java} > 0: jdbc:hive2://xxx:16> select conv('',16,10) % 2; > > +--+--+ > > | (CAST(conv(, 16, 10) AS DOUBLE) % CAST(CAST(2 AS > DECIMAL(20,0)) AS DOUBLE)) | > > +--+--+ > | 0.0 | > +--+--+ > {code} > It caused by: > {code:java} > case a @ BinaryArithmetic(left @ StringType(), right) => > a.makeCopy(Array(Cast(left, DoubleType), right)) > case a @ BinaryArithmetic(left, right @ StringType()) => > a.makeCopy(Array(left, Cast(right, DoubleType))) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23335) expression 'select conv('ffffffffffffffff',16,10) % 2' return 0
[ https://issues.apache.org/jira/browse/SPARK-23335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16381827#comment-16381827 ] zhoukang edited comment on SPARK-23335 at 3/1/18 10:56 AM: --- Actually hive return the same result {code:java} 0: jdbc:hive2://xxx/> select conv('',16,10) % 2; +--+--+ | _c0 | +--+--+ | 0.0 | +--+--+ {code} was (Author: cane): Actually hive return the same result {code:java} 0: jdbc:hive2://c3-hadoop-hive01.bj:18203/> select conv('',16,10) % 2; +--+--+ | _c0 | +--+--+ | 0.0 | +--+--+ {code} > expression 'select conv('',16,10) % 2' return 0 > --- > > Key: SPARK-23335 > URL: https://issues.apache.org/jira/browse/SPARK-23335 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: zhoukang >Priority: Major > > For below expression: > {code:java} > select conv('',16,10) % 2; > {code} > it will return 0. > {code:java} > 0: jdbc:hive2://xxx:16> select conv('',16,10) % 2; > > +--+--+ > > | (CAST(conv(, 16, 10) AS DOUBLE) % CAST(CAST(2 AS > DECIMAL(20,0)) AS DOUBLE)) | > > +--+--+ > | 0.0 | > +--+--+ > {code} > It caused by: > {code:java} > case a @ BinaryArithmetic(left @ StringType(), right) => > a.makeCopy(Array(Cast(left, DoubleType), right)) > case a @ BinaryArithmetic(left, right @ StringType()) => > a.makeCopy(Array(left, Cast(right, DoubleType))) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23335) expression 'select conv('ffffffffffffffff',16,10) % 2' return 0
[ https://issues.apache.org/jira/browse/SPARK-23335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16381824#comment-16381824 ] zhoukang commented on SPARK-23335: -- Agree with you! [~cloud_fan] I will search into hive. > expression 'select conv('',16,10) % 2' return 0 > --- > > Key: SPARK-23335 > URL: https://issues.apache.org/jira/browse/SPARK-23335 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: zhoukang >Priority: Major > > For below expression: > {code:java} > select conv('',16,10) % 2; > {code} > it will return 0. > {code:java} > 0: jdbc:hive2://xxx:16> select conv('',16,10) % 2; > > +--+--+ > > | (CAST(conv(, 16, 10) AS DOUBLE) % CAST(CAST(2 AS > DECIMAL(20,0)) AS DOUBLE)) | > > +--+--+ > | 0.0 | > +--+--+ > {code} > It caused by: > {code:java} > case a @ BinaryArithmetic(left @ StringType(), right) => > a.makeCopy(Array(Cast(left, DoubleType), right)) > case a @ BinaryArithmetic(left, right @ StringType()) => > a.makeCopy(Array(left, Cast(right, DoubleType))) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23335) expression 'select conv('ffffffffffffffff',16,10) % 2' return 0
[ https://issues.apache.org/jira/browse/SPARK-23335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16381800#comment-16381800 ] zhoukang commented on SPARK-23335: -- [~jiangxb1987] [~cloud_fan] Any suggestion?Thanks! > expression 'select conv('',16,10) % 2' return 0 > --- > > Key: SPARK-23335 > URL: https://issues.apache.org/jira/browse/SPARK-23335 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: zhoukang >Priority: Major > > For below expression: > {code:java} > select conv('',16,10) % 2; > {code} > it will return 0. > {code:java} > 0: jdbc:hive2://xxx:16> select conv('',16,10) % 2; > > +--+--+ > > | (CAST(conv(, 16, 10) AS DOUBLE) % CAST(CAST(2 AS > DECIMAL(20,0)) AS DOUBLE)) | > > +--+--+ > | 0.0 | > +--+--+ > {code} > It caused by: > {code:java} > case a @ BinaryArithmetic(left @ StringType(), right) => > a.makeCopy(Array(Cast(left, DoubleType), right)) > case a @ BinaryArithmetic(left, right @ StringType()) => > a.makeCopy(Array(left, Cast(right, DoubleType))) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23508) blockManagerIdCache in BlockManagerId may cause oom
[ https://issues.apache.org/jira/browse/SPARK-23508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-23508: - Description: blockManagerIdCache in BlockManagerId will not remove old values which may cause oom {code:java} val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]() {code} Since whenever we apply a new BlockManagerId, it will put into this map. below is an jmap: !elepahnt-oom1.png! !elephant-oom.png! was: blockManagerIdCache in BlockManagerId will not remove old values which may cause oom {code:java} val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]() {code} Since whenever we apply a new BlockManagerId, it will put into this map. !elephant-oom.png! > blockManagerIdCache in BlockManagerId may cause oom > --- > > Key: SPARK-23508 > URL: https://issues.apache.org/jira/browse/SPARK-23508 > Project: Spark > Issue Type: Bug > Components: Deploy, Spark Core >Affects Versions: 2.1.1, 2.2.1 >Reporter: zhoukang >Priority: Major > Attachments: elepahnt-oom1.png, elephant-oom.png > > > blockManagerIdCache in BlockManagerId will not remove old values which may > cause oom > {code:java} > val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, > BlockManagerId]() > {code} > Since whenever we apply a new BlockManagerId, it will put into this map. > below is an jmap: > !elepahnt-oom1.png! > !elephant-oom.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23508) blockManagerIdCache in BlockManagerId may cause oom
[ https://issues.apache.org/jira/browse/SPARK-23508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-23508: - Description: blockManagerIdCache in BlockManagerId will not remove old values which may cause oom {code:java} val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]() {code} Since whenever we apply a new BlockManagerId, it will put into this map. !elephant-oom.png! was: blockManagerIdCache in BlockManagerId will not remove old values which may cause oom {code:java} val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]() {code} Since whenever we apply a new BlockManagerId, it will put into this map. > blockManagerIdCache in BlockManagerId may cause oom > --- > > Key: SPARK-23508 > URL: https://issues.apache.org/jira/browse/SPARK-23508 > Project: Spark > Issue Type: Bug > Components: Deploy, Spark Core >Affects Versions: 2.1.1, 2.2.1 >Reporter: zhoukang >Priority: Major > Attachments: elepahnt-oom1.png, elephant-oom.png > > > blockManagerIdCache in BlockManagerId will not remove old values which may > cause oom > {code:java} > val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, > BlockManagerId]() > {code} > Since whenever we apply a new BlockManagerId, it will put into this map. > !elephant-oom.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23508) blockManagerIdCache in BlockManagerId may cause oom
[ https://issues.apache.org/jira/browse/SPARK-23508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-23508: - Attachment: elephant-oom.png elepahnt-oom1.png > blockManagerIdCache in BlockManagerId may cause oom > --- > > Key: SPARK-23508 > URL: https://issues.apache.org/jira/browse/SPARK-23508 > Project: Spark > Issue Type: Bug > Components: Deploy, Spark Core >Affects Versions: 2.1.1, 2.2.1 >Reporter: zhoukang >Priority: Major > Attachments: elepahnt-oom1.png, elephant-oom.png > > > blockManagerIdCache in BlockManagerId will not remove old values which may > cause oom > {code:java} > val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, > BlockManagerId]() > {code} > Since whenever we apply a new BlockManagerId, it will put into this map. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23508) blockManagerIdCache in BlockManagerId may cause oom
zhoukang created SPARK-23508: Summary: blockManagerIdCache in BlockManagerId may cause oom Key: SPARK-23508 URL: https://issues.apache.org/jira/browse/SPARK-23508 Project: Spark Issue Type: Bug Components: Deploy, Spark Core Affects Versions: 2.2.1, 2.1.1 Reporter: zhoukang blockManagerIdCache in BlockManagerId will not remove old values which may cause oom {code:java} val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]() {code} Since whenever we apply a new BlockManagerId, it will put into this map. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23335) expression 'select conv('ffffffffffffffff',16,10) % 2' return 0
[ https://issues.apache.org/jira/browse/SPARK-23335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-23335: - Description: For below expression: {code:java} select conv('',16,10) % 2; {code} it will return 0. {code:java} 0: jdbc:hive2://xxx:16> select conv('',16,10) % 2; +--+--+ | (CAST(conv(, 16, 10) AS DOUBLE) % CAST(CAST(2 AS DECIMAL(20,0)) AS DOUBLE)) | +--+--+ | 0.0 | +--+--+ {code} It caused by: {code:java} case a @ BinaryArithmetic(left @ StringType(), right) => a.makeCopy(Array(Cast(left, DoubleType), right)) case a @ BinaryArithmetic(left, right @ StringType()) => a.makeCopy(Array(left, Cast(right, DoubleType))) {code} was: For below expression: {code:java} select conv('',16,10) % 2; {code} it will return 0. {code:java} 0: jdbc:hive2://c3-hadoop-prc-history03.bj:16> select conv('',16,10) % 2; +--+--+ | (CAST(conv(, 16, 10) AS DOUBLE) % CAST(CAST(2 AS DECIMAL(20,0)) AS DOUBLE)) | +--+--+ | 0.0 | +--+--+ {code} It caused by: {code:java} case a @ BinaryArithmetic(left @ StringType(), right) => a.makeCopy(Array(Cast(left, DoubleType), right)) case a @ BinaryArithmetic(left, right @ StringType()) => a.makeCopy(Array(left, Cast(right, DoubleType))) {code} > expression 'select conv('',16,10) % 2' return 0 > --- > > Key: SPARK-23335 > URL: https://issues.apache.org/jira/browse/SPARK-23335 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: zhoukang >Priority: Major > > For below expression: > {code:java} > select conv('',16,10) % 2; > {code} > it will return 0. > {code:java} > 0: jdbc:hive2://xxx:16> select conv('',16,10) % 2; > > +--+--+ > > | (CAST(conv(, 16, 10) AS DOUBLE) % CAST(CAST(2 AS > DECIMAL(20,0)) AS DOUBLE)) | > > +--+--+ > | 0.0 | > +--+--+ > {code} > It caused by: > {code:java} > case a @ BinaryArithmetic(left @ StringType(), right) => > a.makeCopy(Array(Cast(left, DoubleType), right)) case a @ > BinaryArithmetic(left, right @ StringType()) => a.makeCopy(Array(left, > Cast(right, DoubleType))) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23335) expression 'select conv('ffffffffffffffff',16,10) % 2' return 0
[ https://issues.apache.org/jira/browse/SPARK-23335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-23335: - Description: For below expression: {code:java} select conv('',16,10) % 2; {code} it will return 0. {code:java} 0: jdbc:hive2://xxx:16> select conv('',16,10) % 2; +--+--+ | (CAST(conv(, 16, 10) AS DOUBLE) % CAST(CAST(2 AS DECIMAL(20,0)) AS DOUBLE)) | +--+--+ | 0.0 | +--+--+ {code} It caused by: {code:java} case a @ BinaryArithmetic(left @ StringType(), right) => a.makeCopy(Array(Cast(left, DoubleType), right)) case a @ BinaryArithmetic(left, right @ StringType()) => a.makeCopy(Array(left, Cast(right, DoubleType))) {code} was: For below expression: {code:java} select conv('',16,10) % 2; {code} it will return 0. {code:java} 0: jdbc:hive2://xxx:16> select conv('',16,10) % 2; +--+--+ | (CAST(conv(, 16, 10) AS DOUBLE) % CAST(CAST(2 AS DECIMAL(20,0)) AS DOUBLE)) | +--+--+ | 0.0 | +--+--+ {code} It caused by: {code:java} case a @ BinaryArithmetic(left @ StringType(), right) => a.makeCopy(Array(Cast(left, DoubleType), right)) case a @ BinaryArithmetic(left, right @ StringType()) => a.makeCopy(Array(left, Cast(right, DoubleType))) {code} > expression 'select conv('',16,10) % 2' return 0 > --- > > Key: SPARK-23335 > URL: https://issues.apache.org/jira/browse/SPARK-23335 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: zhoukang >Priority: Major > > For below expression: > {code:java} > select conv('',16,10) % 2; > {code} > it will return 0. > {code:java} > 0: jdbc:hive2://xxx:16> select conv('',16,10) % 2; > > +--+--+ > > | (CAST(conv(, 16, 10) AS DOUBLE) % CAST(CAST(2 AS > DECIMAL(20,0)) AS DOUBLE)) | > > +--+--+ > | 0.0 | > +--+--+ > {code} > It caused by: > {code:java} > case a @ BinaryArithmetic(left @ StringType(), right) => > a.makeCopy(Array(Cast(left, DoubleType), right)) > case a @ BinaryArithmetic(left, right @ StringType()) => > a.makeCopy(Array(left, Cast(right, DoubleType))) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23335) expression 'select conv('ffffffffffffffff',16,10) % 2' return 0
zhoukang created SPARK-23335: Summary: expression 'select conv('',16,10) % 2' return 0 Key: SPARK-23335 URL: https://issues.apache.org/jira/browse/SPARK-23335 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.1.0 Reporter: zhoukang For below expression: {code:java} select conv('',16,10) % 2; {code} it will return 0. {code:java} 0: jdbc:hive2://c3-hadoop-prc-history03.bj:16> select conv('',16,10) % 2; +--+--+ | (CAST(conv(, 16, 10) AS DOUBLE) % CAST(CAST(2 AS DECIMAL(20,0)) AS DOUBLE)) | +--+--+ | 0.0 | +--+--+ {code} It caused by: {code:java} case a @ BinaryArithmetic(left @ StringType(), right) => a.makeCopy(Array(Cast(left, DoubleType), right)) case a @ BinaryArithmetic(left, right @ StringType()) => a.makeCopy(Array(left, Cast(right, DoubleType))) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23129) Lazy init DiskMapIterator#deserializeStream to reduce memory usage when ExternalAppendOnlyMap spill too many times
[ https://issues.apache.org/jira/browse/SPARK-23129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-23129: - Description: Currently,the deserializeStream in ExternalAppendOnlyMap#DiskMapIterator init when DiskMapIterator instance created.This will cause memory use overhead when ExternalAppendOnlyMap spill too many times. We can avoid this by making deserializeStream init when it is used the first time. was: Currently,the deserializeStream in ExternalAppendOnlyMap#DiskMapIterator init when DiskMapIterator instance created.This will cause memory use overhead when ExternalAppendOnlyMap spill too much times. We can avoid this by making deserializeStream init when it is used the first time. > Lazy init DiskMapIterator#deserializeStream to reduce memory usage when > ExternalAppendOnlyMap spill too many times > --- > > Key: SPARK-23129 > URL: https://issues.apache.org/jira/browse/SPARK-23129 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: zhoukang >Assignee: zhoukang >Priority: Major > Fix For: 2.3.0 > > > Currently,the deserializeStream in ExternalAppendOnlyMap#DiskMapIterator init > when DiskMapIterator instance created.This will cause memory use overhead > when ExternalAppendOnlyMap spill too many times. > We can avoid this by making deserializeStream init when it is used the first > time. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23129) Lazy init DiskMapIterator#deserializeStream to reduce memory usage when ExternalAppendOnlyMap spill too many times
[ https://issues.apache.org/jira/browse/SPARK-23129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-23129: - Summary: Lazy init DiskMapIterator#deserializeStream to reduce memory usage when ExternalAppendOnlyMap spill too many times (was: Lazy init DiskMapIterator#deserializeStream to reduce memory usage when ExternalAppendOnlyMap spill too much times) > Lazy init DiskMapIterator#deserializeStream to reduce memory usage when > ExternalAppendOnlyMap spill too many times > --- > > Key: SPARK-23129 > URL: https://issues.apache.org/jira/browse/SPARK-23129 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: zhoukang >Assignee: zhoukang >Priority: Major > Fix For: 2.3.0 > > > Currently,the deserializeStream in ExternalAppendOnlyMap#DiskMapIterator init > when DiskMapIterator instance created.This will cause memory use overhead > when ExternalAppendOnlyMap spill too much times. > We can avoid this by making deserializeStream init when it is used the first > time. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23129) Lazy init DiskMapIterator#deserializeStream to reduce memory usage when ExternalAppendOnlyMap spill too much times
zhoukang created SPARK-23129: Summary: Lazy init DiskMapIterator#deserializeStream to reduce memory usage when ExternalAppendOnlyMap spill too much times Key: SPARK-23129 URL: https://issues.apache.org/jira/browse/SPARK-23129 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.1.0 Reporter: zhoukang Currently,the deserializeStream in ExternalAppendOnlyMap#DiskMapIterator init when DiskMapIterator instance created.This will cause memory use overhead when ExternalAppendOnlyMap spill too much times. We can avoid this by making deserializeStream init when it is used the first time. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23076) When we call cache() on RDD which depends on ShuffleRowRDD, we will get an error result
[ https://issues.apache.org/jira/browse/SPARK-23076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326685#comment-16326685 ] zhoukang commented on SPARK-23076: -- Yes,may it should be an improvement?Since i suppose some other may have same requirement. And the original reason we cache the MapPartitionsRDD is that we do result size estimate in SparkExecuteStatementOperation#execute,so we cache the result rdd first. The reason we estimate the result size is to avoid thriftserver oom error. [~cloud_fan] > When we call cache() on RDD which depends on ShuffleRowRDD, we will get an > error result > --- > > Key: SPARK-23076 > URL: https://issues.apache.org/jira/browse/SPARK-23076 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: zhoukang >Priority: Major > Attachments: shufflerowrdd-cache.png > > > For query below: > {code:java} > select * from csv_demo limit 3; > {code} > The correct result should be: > 0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3; > ++++-- > |_c0|_c1| > ++++-- > |Joe|20| > |Tom|30| > |Hyukjin|25| > ++++-- > However,when we call cache on MapPartitionsRDD below: > !shufflerowrdd-cache.png! > Then result will be error: > 0: jdbc:hive2://xxx/> select * from csv_demo limit 3; > ++++-- > |_c0|_c1| > ++++-- > |Hyukjin|25| > |Hyukjin|25| > |Hyukjin|25| > ++++-- > The reason why this happen is that: > UnsafeRow which generated by ShuffleRowRDD#compute will use the same under > byte buffer > I print some log below to explain this: > Modify UnsafeRow.toString() > {code:java} > // This is for debugging > @Override > public String toString() { > StringBuilder build = new StringBuilder("["); > for (int i = 0; i < sizeInBytes; i += 8) { > if (i != 0) build.append(','); > build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, > baseOffset + i))); > } > build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and > baseOffset here > return build.toString(); > }{code} > {code:java} > 2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: > Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16] > 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: > Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16] > 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: > Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16] > {code} > we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD > iterator when config is true,like below: > {code:java} > reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => { > if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE) > && x._2.isInstanceOf[UnsafeRow]) { > (x._2).asInstanceOf[UnsafeRow].copy() > } else { > x._2 > } > }) > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23076) When we call cache() on RDD which depends on ShuffleRowRDD, we will get an error result
[ https://issues.apache.org/jira/browse/SPARK-23076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-23076: - Issue Type: Improvement (was: Bug) > When we call cache() on RDD which depends on ShuffleRowRDD, we will get an > error result > --- > > Key: SPARK-23076 > URL: https://issues.apache.org/jira/browse/SPARK-23076 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0 >Reporter: zhoukang >Priority: Major > Attachments: shufflerowrdd-cache.png > > > For query below: > {code:java} > select * from csv_demo limit 3; > {code} > The correct result should be: > 0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3; > ++++-- > |_c0|_c1| > ++++-- > |Joe|20| > |Tom|30| > |Hyukjin|25| > ++++-- > However,when we call cache on MapPartitionsRDD below: > !shufflerowrdd-cache.png! > Then result will be error: > 0: jdbc:hive2://xxx/> select * from csv_demo limit 3; > ++++-- > |_c0|_c1| > ++++-- > |Hyukjin|25| > |Hyukjin|25| > |Hyukjin|25| > ++++-- > The reason why this happen is that: > UnsafeRow which generated by ShuffleRowRDD#compute will use the same under > byte buffer > I print some log below to explain this: > Modify UnsafeRow.toString() > {code:java} > // This is for debugging > @Override > public String toString() { > StringBuilder build = new StringBuilder("["); > for (int i = 0; i < sizeInBytes; i += 8) { > if (i != 0) build.append(','); > build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, > baseOffset + i))); > } > build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and > baseOffset here > return build.toString(); > }{code} > {code:java} > 2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: > Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16] > 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: > Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16] > 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: > Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16] > {code} > we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD > iterator when config is true,like below: > {code:java} > reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => { > if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE) > && x._2.isInstanceOf[UnsafeRow]) { > (x._2).asInstanceOf[UnsafeRow].copy() > } else { > x._2 > } > }) > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23076) When we call cache() on RDD which depends on ShuffleRowRDD, we will get an error result
[ https://issues.apache.org/jira/browse/SPARK-23076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-23076: - Summary: When we call cache() on RDD which depends on ShuffleRowRDD, we will get an error result (was: When we call cache() on ShuffleRowRDD, we will get an error result) > When we call cache() on RDD which depends on ShuffleRowRDD, we will get an > error result > --- > > Key: SPARK-23076 > URL: https://issues.apache.org/jira/browse/SPARK-23076 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: zhoukang >Priority: Major > Attachments: shufflerowrdd-cache.png > > > For query below: > {code:java} > select * from csv_demo limit 3; > {code} > The correct result should be: > 0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3; > ++++-- > |_c0|_c1| > ++++-- > |Joe|20| > |Tom|30| > |Hyukjin|25| > ++++-- > However,when we call cache on MapPartitionsRDD below: > !shufflerowrdd-cache.png! > Then result will be error: > 0: jdbc:hive2://xxx/> select * from csv_demo limit 3; > ++++-- > |_c0|_c1| > ++++-- > |Hyukjin|25| > |Hyukjin|25| > |Hyukjin|25| > ++++-- > The reason why this happen is that: > UnsafeRow which generated by ShuffleRowRDD#compute will use the same under > byte buffer > I print some log below to explain this: > Modify UnsafeRow.toString() > {code:java} > // This is for debugging > @Override > public String toString() { > StringBuilder build = new StringBuilder("["); > for (int i = 0; i < sizeInBytes; i += 8) { > if (i != 0) build.append(','); > build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, > baseOffset + i))); > } > build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and > baseOffset here > return build.toString(); > }{code} > {code:java} > 2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: > Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16] > 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: > Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16] > 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: > Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16] > {code} > we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD > iterator when config is true,like below: > {code:java} > reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => { > if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE) > && x._2.isInstanceOf[UnsafeRow]) { > (x._2).asInstanceOf[UnsafeRow].copy() > } else { > x._2 > } > }) > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23076) When we call cache() on ShuffleRowRDD, we will get an error result
[ https://issues.apache.org/jira/browse/SPARK-23076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326242#comment-16326242 ] zhoukang edited comment on SPARK-23076 at 1/15/18 1:34 PM: --- As the picture i posted, i cached MapPartitionsRDD which depends on ShuffleRowRDD. We used this for our internal spark sql service [~cloud_fan] was (Author: cane): As the picture i posted, i cached MapPartitionsRDD which depends on ShuffleRowRDD. We used this our internal spark sql service [~cloud_fan] > When we call cache() on ShuffleRowRDD, we will get an error result > -- > > Key: SPARK-23076 > URL: https://issues.apache.org/jira/browse/SPARK-23076 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: zhoukang >Priority: Major > Attachments: shufflerowrdd-cache.png > > > For query below: > {code:java} > select * from csv_demo limit 3; > {code} > The correct result should be: > 0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3; > ++++-- > |_c0|_c1| > ++++-- > |Joe|20| > |Tom|30| > |Hyukjin|25| > ++++-- > However,when we call cache on MapPartitionsRDD below: > !shufflerowrdd-cache.png! > Then result will be error: > 0: jdbc:hive2://xxx/> select * from csv_demo limit 3; > ++++-- > |_c0|_c1| > ++++-- > |Hyukjin|25| > |Hyukjin|25| > |Hyukjin|25| > ++++-- > The reason why this happen is that: > UnsafeRow which generated by ShuffleRowRDD#compute will use the same under > byte buffer > I print some log below to explain this: > Modify UnsafeRow.toString() > {code:java} > // This is for debugging > @Override > public String toString() { > StringBuilder build = new StringBuilder("["); > for (int i = 0; i < sizeInBytes; i += 8) { > if (i != 0) build.append(','); > build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, > baseOffset + i))); > } > build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and > baseOffset here > return build.toString(); > }{code} > {code:java} > 2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: > Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16] > 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: > Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16] > 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: > Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16] > {code} > we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD > iterator when config is true,like below: > {code:java} > reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => { > if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE) > && x._2.isInstanceOf[UnsafeRow]) { > (x._2).asInstanceOf[UnsafeRow].copy() > } else { > x._2 > } > }) > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23076) When we call cache() on ShuffleRowRDD, we will get an error result
[ https://issues.apache.org/jira/browse/SPARK-23076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326242#comment-16326242 ] zhoukang commented on SPARK-23076: -- As the picture i posted, i cached MapPartitionsRDD which depends on ShuffleRowRDD. We used this our internal spark sql service [~cloud_fan] > When we call cache() on ShuffleRowRDD, we will get an error result > -- > > Key: SPARK-23076 > URL: https://issues.apache.org/jira/browse/SPARK-23076 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: zhoukang >Priority: Major > Attachments: shufflerowrdd-cache.png > > > For query below: > {code:java} > select * from csv_demo limit 3; > {code} > The correct result should be: > 0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3; > ++++-- > |_c0|_c1| > ++++-- > |Joe|20| > |Tom|30| > |Hyukjin|25| > ++++-- > However,when we call cache on MapPartitionsRDD below: > !shufflerowrdd-cache.png! > Then result will be error: > 0: jdbc:hive2://xxx/> select * from csv_demo limit 3; > ++++-- > |_c0|_c1| > ++++-- > |Hyukjin|25| > |Hyukjin|25| > |Hyukjin|25| > ++++-- > The reason why this happen is that: > UnsafeRow which generated by ShuffleRowRDD#compute will use the same under > byte buffer > I print some log below to explain this: > Modify UnsafeRow.toString() > {code:java} > // This is for debugging > @Override > public String toString() { > StringBuilder build = new StringBuilder("["); > for (int i = 0; i < sizeInBytes; i += 8) { > if (i != 0) build.append(','); > build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, > baseOffset + i))); > } > build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and > baseOffset here > return build.toString(); > }{code} > {code:java} > 2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: > Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16] > 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: > Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16] > 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: > Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16] > {code} > we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD > iterator when config is true,like below: > {code:java} > reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => { > if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE) > && x._2.isInstanceOf[UnsafeRow]) { > (x._2).asInstanceOf[UnsafeRow].copy() > } else { > x._2 > } > }) > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23076) When we call cache() on ShuffleRowRDD, we will get an error result
[ https://issues.apache.org/jira/browse/SPARK-23076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-23076: - Description: For query below: {code:java} select * from csv_demo limit 3; {code} The correct result should be: 0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3; ++++-- |_c0|_c1| ++++-- |Joe|20| |Tom|30| |Hyukjin|25| ++++-- However,when we call cache on MapPartitionsRDD below: !shufflerowrdd-cache.png! Then result will be error: 0: jdbc:hive2://xxx/> select * from csv_demo limit 3; ++++-- |_c0|_c1| ++++-- |Hyukjin|25| |Hyukjin|25| |Hyukjin|25| ++++-- The reason why this happen is that: UnsafeRow which generated by ShuffleRowRDD#compute will use the same under byte buffer I print some log below to explain this: Modify UnsafeRow.toString() {code:java} // This is for debugging @Override public String toString() { StringBuilder build = new StringBuilder("["); for (int i = 0; i < sizeInBytes; i += 8) { if (i != 0) build.append(','); build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, baseOffset + i))); } build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and baseOffset here return build.toString(); }{code} {code:java} 2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16] 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16] 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16] {code} we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD iterator when config is true,like below: {code:java} reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => { if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE) && x._2.isInstanceOf[UnsafeRow]) { (x._2).asInstanceOf[UnsafeRow].copy() } else { x._2 } }) } {code} was: For query below: {code:java} select * from csv_demo limit 3; {code} The correct result should be: 0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3; +---+-++-- |_c0|_c1| +---+-++-- |Joe|20| |Tom|30| |Hyukjin|25| +---+-++-- However,when we call cache on ShuffleRowRDD(or RDD which depends on ShuffleRowRDD in one stage): !shufflerowrdd-cache.png! Then result will be error: 0: jdbc:hive2://xxx/> select * from csv_demo limit 3; +---+-++-- |_c0|_c1| +---+-++-- |Hyukjin|25| |Hyukjin|25| |Hyukjin|25| +---+-++-- The reason why this happen is that: UnsafeRow which generated by ShuffleRowRDD#compute will use the same under byte buffer I print some log below to explain this: Modify UnsafeRow.toString() {code:java} // This is for debugging @Override public String toString() { StringBuilder build = new StringBuilder("["); for (int i = 0; i < sizeInBytes; i += 8) { if (i != 0) build.append(','); build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, baseOffset + i))); } build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and baseOffset here return build.toString(); }{code} {code:java} 2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16] 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16] 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16] {code} we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD iterator when config is true,like below: {code:java} reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => { if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE) && x._2.isInstanceOf[UnsafeRow]) { (x._2).asInstanceOf[UnsafeRow].copy() } else { x._2 } }) } {code} > When we call cache() on ShuffleRowRDD, we will get an error result > -- > > Key: SPARK-23076 > URL: https://issues.apache.org/jira/browse/SPARK-23076 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: zhoukang >Priority: Major > Attachments: shufflerowrdd-cache.png > > > For query below: > {code:java} > select * from csv_demo limit 3; > {code} > The correct result should be: > 0: jdbc:hive2://10.108.230.228:1/> select * from
[jira] [Updated] (SPARK-23076) When we call cache() on ShuffleRowRDD, we will get an error result
[ https://issues.apache.org/jira/browse/SPARK-23076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-23076: - Description: For query below: {code:java} select * from csv_demo limit 3; {code} The correct result should be: 0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3; +---+-++-- |_c0|_c1| +---+-++-- |Joe|20| |Tom|30| |Hyukjin|25| +---+-++-- However,when we call cache on ShuffleRowRDD(or RDD which depends on ShuffleRowRDD in one stage): !shufflerowrdd-cache.png! Then result will be error: 0: jdbc:hive2://xxx/> select * from csv_demo limit 3; +---+-++-- |_c0|_c1| +---+-++-- |Hyukjin|25| |Hyukjin|25| |Hyukjin|25| +---+-++-- The reason why this happen is that: UnsafeRow which generated by ShuffleRowRDD#compute will use the same under byte buffer I print some log below to explain this: Modify UnsafeRow.toString() {code:java} // This is for debugging @Override public String toString() { StringBuilder build = new StringBuilder("["); for (int i = 0; i < sizeInBytes; i += 8) { if (i != 0) build.append(','); build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, baseOffset + i))); } build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and baseOffset here return build.toString(); }{code} {code:java} 2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16] 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16] 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16] {code} we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD iterator when config is true,like below: {code:java} reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => { if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE) && x._2.isInstanceOf[UnsafeRow]) { (x._2).asInstanceOf[UnsafeRow].copy() } else { x._2 } }) } {code} was: For query below: {code:java} select * from csv_demo limit 3; {code} The correct result should be: 0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3; +--+--++-- |_c0|_c1| +--+--++-- |Joe|20| |Tom|30| |Hyukjin|25| +--+--++-- However,when we call cache on ShuffleRowRDD(or RDD which depends on ShuffleRowRDD in one stage): !shufflerowrdd-cache.png! Then result will be error: 0: jdbc:hive2://xxx/> select * from csv_demo limit 3; +--+--++-- |_c0|_c1| +--+--++-- |Hyukjin|25| |Hyukjin|25| |Hyukjin|25| +--+--++-- The reason why this happen is that: UnsafeRow which generated by ShuffleRowRDD#compute will use the same under byte buffer I print some log below to explain this: Modify UnsafeRow.toString() {code:java} // This is for debugging @Override public String toString() { StringBuilder build = new StringBuilder("["); for (int i = 0; i < sizeInBytes; i += 8) { if (i != 0) build.append(','); build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, baseOffset + i))); } build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and baseOffset here return build.toString(); }{code} {code:java} 2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16] 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16] 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16] {code} we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD iterator when config is true,like below: {code:java} reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => { if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE) && x._2.isInstanceOf[UnsafeRow]) { (x._2).asInstanceOf[UnsafeRow].copy() } else { x._2 } }) } {code} > When we call cache() on ShuffleRowRDD, we will get an error result > -- > > Key: SPARK-23076 > URL: https://issues.apache.org/jira/browse/SPARK-23076 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: zhoukang >Priority: Major > Attachments: shufflerowrdd-cache.png > > > For query below: > {code:java} > select * from csv_demo limit 3; > {code} > The correct result should be: > 0:
[jira] [Updated] (SPARK-23076) When we call cache() on ShuffleRowRDD, we will get an error result
[ https://issues.apache.org/jira/browse/SPARK-23076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-23076: - Description: For query below: {code:java} select * from csv_demo limit 3; {code} The correct result should be: 0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3; +--+--++-- |_c0|_c1| +--+--++-- |Joe|20| |Tom|30| |Hyukjin|25| +--+--++-- However,when we call cache on ShuffleRowRDD(or RDD which depends on ShuffleRowRDD in one stage): !shufflerowrdd-cache.png! Then result will be error: 0: jdbc:hive2://xxx/> select * from csv_demo limit 3; +--+--++-- |_c0|_c1| +--+--++-- |Hyukjin|25| |Hyukjin|25| |Hyukjin|25| +--+--++-- The reason why this happen is that: UnsafeRow which generated by ShuffleRowRDD#compute will use the same under byte buffer I print some log below to explain this: Modify UnsafeRow.toString() {code:java} // This is for debugging @Override public String toString() { StringBuilder build = new StringBuilder("["); for (int i = 0; i < sizeInBytes; i += 8) { if (i != 0) build.append(','); build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, baseOffset + i))); } build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and baseOffset here return build.toString(); }{code} {code:java} 2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16] 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16] 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16] {code} we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD iterator when config is true,like below: {code:java} reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => { if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE) && x._2.isInstanceOf[UnsafeRow]) { (x._2).asInstanceOf[UnsafeRow].copy() } else { x._2 } }) } {code} was: For query below: {code:java} select * from csv_demo limit 3; {code} The correct result should be: 0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3; ++++-- |_c0|_c1| ++++-- |Joe|20| |Tom|30| |Hyukjin|25| ++++-- However,when we call cache on ShuffleRowRDD(or RDD which depends on ShuffleRowRDD in one stage): !shufflerowrdd-cache.png! Then result will be error: 0: jdbc:hive2://xxx/> select * from csv_demo limit 3; ++++-- |_c0|_c1| ++++-- |Hyukjin|25| |Hyukjin|25| |Hyukjin|25| ++++-- The reason why this happen is that: UnsafeRow which generated by ShuffleRowRDD#compute will use the same under byte buffer I print some log below to explain this: Modify UnsafeRow.toString() {code:java} // This is for debugging @Override public String toString() { StringBuilder build = new StringBuilder("["); for (int i = 0; i < sizeInBytes; i += 8) { if (i != 0) build.append(','); build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, baseOffset + i))); } build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and baseOffset here return build.toString(); }{code} {code:java} 2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16] 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16] 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16] {code} > When we call cache() on ShuffleRowRDD, we will get an error result > -- > > Key: SPARK-23076 > URL: https://issues.apache.org/jira/browse/SPARK-23076 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: zhoukang >Priority: Major > Attachments: shufflerowrdd-cache.png > > > > For query below: > > {code:java} > select * from csv_demo limit 3; > {code} > The correct result should be: > 0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3; > +--+--++-- > |_c0|_c1| > +--+--++-- > |Joe|20| > |Tom|30| > |Hyukjin|25| > +--+--++-- > However,when we call cache on ShuffleRowRDD(or RDD which depends on > ShuffleRowRDD in one stage): > !shufflerowrdd-cache.png! > Then result will be error: > 0: jdbc:hive2://xxx/> select * from
[jira] [Updated] (SPARK-23076) When we call cache() on ShuffleRowRDD, we will get an error result
[ https://issues.apache.org/jira/browse/SPARK-23076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-23076: - Description: For query below: {code:java} select * from csv_demo limit 3; {code} The correct result should be: 0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3; ++++-- |_c0|_c1| ++++-- |Joe|20| |Tom|30| |Hyukjin|25| ++++-- However,when we call cache on ShuffleRowRDD(or RDD which depends on ShuffleRowRDD in one stage): !shufflerowrdd-cache.png! Then result will be error: 0: jdbc:hive2://xxx/> select * from csv_demo limit 3; ++++-- |_c0|_c1| ++++-- |Hyukjin|25| |Hyukjin|25| |Hyukjin|25| ++++-- The reason why this happen is that: UnsafeRow which generated by ShuffleRowRDD#compute will use the same under byte buffer I print some log below to explain this: Modify UnsafeRow.toString() {code:java} // This is for debugging @Override public String toString() { StringBuilder build = new StringBuilder("["); for (int i = 0; i < sizeInBytes; i += 8) { if (i != 0) build.append(','); build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, baseOffset + i))); } build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and baseOffset here return build.toString(); }{code} {code:java} 2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16] 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16] 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16] {code} was: For query below: {code:java} select * from csv_demo limit 3; {code} The correct result should be: 0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3; +---+-++-- |_c0|_c1| +---+-++-- |Joe|20| |Tom|30| |Hyukjin|25| +---+-++-- However,when we call cache on ShuffleRowRDD(or RDD which depends on ShuffleRowRDD in one stage): Then result will be error: 0: jdbc:hive2://xxx/> select * from csv_demo limit 3; +---+-++-- |_c0|_c1| +---+-++-- |Hyukjin|25| |Hyukjin|25| |Hyukjin|25| +---+-++-- The reason why this happen is that: UnsafeRow which generated by ShuffleRowRDD#compute will use the same under byte buffer I print some log below to explain this: Modify UnsafeRow.toString() {code:java} // This is for debugging @Override public String toString() { StringBuilder build = new StringBuilder("["); for (int i = 0; i < sizeInBytes; i += 8) { if (i != 0) build.append(','); build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, baseOffset + i))); } build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and baseOffset here return build.toString(); }{code} 2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16] 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16] 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16] > When we call cache() on ShuffleRowRDD, we will get an error result > -- > > Key: SPARK-23076 > URL: https://issues.apache.org/jira/browse/SPARK-23076 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: zhoukang >Priority: Major > Attachments: shufflerowrdd-cache.png > > > For query below: > {code:java} > select * from csv_demo limit 3; > {code} > The correct result should be: > 0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3; > ++++-- > |_c0|_c1| > ++++-- > |Joe|20| > |Tom|30| > |Hyukjin|25| > ++++-- > However,when we call cache on ShuffleRowRDD(or RDD which depends on > ShuffleRowRDD in one stage): > !shufflerowrdd-cache.png! > Then result will be error: > 0: jdbc:hive2://xxx/> select * from csv_demo limit 3; > ++++-- > |_c0|_c1| > ++++-- > |Hyukjin|25| > |Hyukjin|25| > |Hyukjin|25| > ++++-- > The reason why this happen is that: > UnsafeRow which generated by ShuffleRowRDD#compute will use the same under > byte buffer > I print some log below to explain this: > Modify UnsafeRow.toString() > {code:java} > // This is for debugging > @Override > public String toString() { > StringBuilder build =
[jira] [Updated] (SPARK-23076) When we call cache() on ShuffleRowRDD, we will get an error result
[ https://issues.apache.org/jira/browse/SPARK-23076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhoukang updated SPARK-23076: - Attachment: shufflerowrdd-cache.png > When we call cache() on ShuffleRowRDD, we will get an error result > -- > > Key: SPARK-23076 > URL: https://issues.apache.org/jira/browse/SPARK-23076 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: zhoukang >Priority: Major > Attachments: shufflerowrdd-cache.png > > > For query below: > {code:java} > select * from csv_demo limit 3; > {code} > The correct result should be: > 0: jdbc:hive2://10.108.230.228:1/> select * from csv_demo limit 3; > +---+-++-- > |_c0|_c1| > +---+-++-- > |Joe|20| > |Tom|30| > |Hyukjin|25| > +---+-++-- > However,when we call cache on ShuffleRowRDD(or RDD which depends on > ShuffleRowRDD in one stage): > > Then result will be error: > 0: jdbc:hive2://xxx/> select * from csv_demo limit 3; > +---+-++-- > |_c0|_c1| > +---+-++-- > |Hyukjin|25| > |Hyukjin|25| > |Hyukjin|25| > +---+-++-- > The reason why this happen is that: > UnsafeRow which generated by ShuffleRowRDD#compute will use the same under > byte buffer > I print some log below to explain this: > Modify UnsafeRow.toString() > {code:java} > // This is for debugging > @Override > public String toString() { > StringBuilder build = new StringBuilder("["); > for (int i = 0; i < sizeInBytes; i += 8) { > if (i != 0) build.append(','); > build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, > baseOffset + i))); > } > build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and > baseOffset here > return build.toString(); > }{code} > 2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: > Read value: [0,180003,22,656f4a,3032,[B@6225ec90,16] > 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: > Read value: [0,180003,22,6d6f54,3033,[B@6225ec90,16] > 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: > Read value: [0,180007,22,6e696a6b757948,3532,[B@6225ec90,16] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org