[jira] [Assigned] (SPARK-22347) UDF is evaluated when 'F.when' condition is false
[ https://issues.apache.org/jira/browse/SPARK-22347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-22347: Assignee: (was: Apache Spark) > UDF is evaluated when 'F.when' condition is false > - > > Key: SPARK-22347 > URL: https://issues.apache.org/jira/browse/SPARK-22347 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.0 >Reporter: Nicolas Porter >Priority: Minor > > Here's a simple example on how to reproduce this: > {code} > from pyspark.sql import functions as F, Row, types > def Divide10(): > def fn(value): return 10 / int(value) > return F.udf(fn, types.IntegerType()) > df = sc.parallelize([Row(x=5), Row(x=0)]).toDF() > x = F.col('x') > df2 = df.select(F.when((x > 0), Divide10()(x))) > df2.show(200) > {code} > This raises a division by zero error, even if `F.when` is trying to filter > out all cases where `x <= 0`. I believe the correct behavior should be not to > evaluate the UDF when the `F.when` condition is false. > Interestingly enough, when the `F.when` condition is set to `F.lit(False)`, > then the error is not raised and all rows resolve to `null`, which is the > expected result. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-22347) UDF is evaluated when 'F.when' condition is false
[ https://issues.apache.org/jira/browse/SPARK-22347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-22347: Assignee: Apache Spark > UDF is evaluated when 'F.when' condition is false > - > > Key: SPARK-22347 > URL: https://issues.apache.org/jira/browse/SPARK-22347 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.0 >Reporter: Nicolas Porter >Assignee: Apache Spark >Priority: Minor > > Here's a simple example on how to reproduce this: > {code} > from pyspark.sql import functions as F, Row, types > def Divide10(): > def fn(value): return 10 / int(value) > return F.udf(fn, types.IntegerType()) > df = sc.parallelize([Row(x=5), Row(x=0)]).toDF() > x = F.col('x') > df2 = df.select(F.when((x > 0), Divide10()(x))) > df2.show(200) > {code} > This raises a division by zero error, even if `F.when` is trying to filter > out all cases where `x <= 0`. I believe the correct behavior should be not to > evaluate the UDF when the `F.when` condition is false. > Interestingly enough, when the `F.when` condition is set to `F.lit(False)`, > then the error is not raised and all rows resolve to `null`, which is the > expected result. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22347) UDF is evaluated when 'F.when' condition is false
[ https://issues.apache.org/jira/browse/SPARK-22347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221719#comment-16221719 ] Apache Spark commented on SPARK-22347: -- User 'viirya' has created a pull request for this issue: https://github.com/apache/spark/pull/19584 > UDF is evaluated when 'F.when' condition is false > - > > Key: SPARK-22347 > URL: https://issues.apache.org/jira/browse/SPARK-22347 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.0 >Reporter: Nicolas Porter >Priority: Minor > > Here's a simple example on how to reproduce this: > {code} > from pyspark.sql import functions as F, Row, types > def Divide10(): > def fn(value): return 10 / int(value) > return F.udf(fn, types.IntegerType()) > df = sc.parallelize([Row(x=5), Row(x=0)]).toDF() > x = F.col('x') > df2 = df.select(F.when((x > 0), Divide10()(x))) > df2.show(200) > {code} > This raises a division by zero error, even if `F.when` is trying to filter > out all cases where `x <= 0`. I believe the correct behavior should be not to > evaluate the UDF when the `F.when` condition is false. > Interestingly enough, when the `F.when` condition is set to `F.lit(False)`, > then the error is not raised and all rows resolve to `null`, which is the > expected result. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22347) UDF is evaluated when 'F.when' condition is false
[ https://issues.apache.org/jira/browse/SPARK-22347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221612#comment-16221612 ] Liang-Chi Hsieh edited comment on SPARK-22347 at 10/27/17 5:06 AM: --- Under the current execution mode of Python UDFs, I think it is hard to support Python UDFs as branch values or else value in CaseWhen expression. The execution of batch/vectorized Python UDFs evaluates the UDFs in an operator at once. It might not be easy to let it support conditional execution. I'd rather like to disable the usage of Python UDFs in CaseWhen. I think it can be very easy to incorporate the condition logic of CaseWhen into the Python UDFs, e.g. for the above example: {code} def divideByValue(): def fn(value): return 10 / int(value) if (value > 0) else None return udf(fn, types.IntegerType()) df2 = df.select(divideByValue()(x)) df2.show() +-+ |fn(x)| +-+ |2| | null| +-+ {code} was (Author: viirya): Under the current execution mode of Python UDFs, I think it is hard to support Python UDFs as branch values or else value in CaseWhen expression. The execution of batch/vectorized Python UDFs evaluates the UDFs in an operator at once. It might not be easy to let it support conditional execution. I'd rather like to disable the usage of Python UDFs in CaseWhen. I think it can be very easy to incorporate the condition logic of CaseWhen into the Python UDFs, e.g. for the above example: {code} def divideByValue(): def fn(value): return 10 / int(value) if (value > 0) else None return udf(fn, types.IntegerType()) df2 = df.select(divideByValue()(x))) df2.show() ++ |CASE WHEN (x > 0) THEN fn(x) END| ++ | 2| |null| ++ {code} > UDF is evaluated when 'F.when' condition is false > - > > Key: SPARK-22347 > URL: https://issues.apache.org/jira/browse/SPARK-22347 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.0 >Reporter: Nicolas Porter >Priority: Minor > > Here's a simple example on how to reproduce this: > {code} > from pyspark.sql import functions as F, Row, types > def Divide10(): > def fn(value): return 10 / int(value) > return F.udf(fn, types.IntegerType()) > df = sc.parallelize([Row(x=5), Row(x=0)]).toDF() > x = F.col('x') > df2 = df.select(F.when((x > 0), Divide10()(x))) > df2.show(200) > {code} > This raises a division by zero error, even if `F.when` is trying to filter > out all cases where `x <= 0`. I believe the correct behavior should be not to > evaluate the UDF when the `F.when` condition is false. > Interestingly enough, when the `F.when` condition is set to `F.lit(False)`, > then the error is not raised and all rows resolve to `null`, which is the > expected result. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22347) UDF is evaluated when 'F.when' condition is false
[ https://issues.apache.org/jira/browse/SPARK-22347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221612#comment-16221612 ] Liang-Chi Hsieh edited comment on SPARK-22347 at 10/27/17 5:05 AM: --- Under the current execution mode of Python UDFs, I think it is hard to support Python UDFs as branch values or else value in CaseWhen expression. The execution of batch/vectorized Python UDFs evaluates the UDFs in an operator at once. It might not be easy to let it support conditional execution. I'd rather like to disable the usage of Python UDFs in CaseWhen. I think it can be very easy to incorporate the condition logic of CaseWhen into the Python UDFs, e.g. for the above example: {code} def divideByValue(): def fn(value): return 10 / int(value) if (value > 0) else None return udf(fn, types.IntegerType()) df2 = df.select(divideByValue()(x))) df2.show() ++ |CASE WHEN (x > 0) THEN fn(x) END| ++ | 2| |null| ++ {code} was (Author: viirya): Under the current execution mode of Python UDFs, I think it is hard to support Python UDFs as branch values or else value in CaseWhen expression. The execution of batch/vectorized Python UDFs evaluates the UDFs in an operator at once. It might not be easy to let it support conditional execution. I'd rather like to disable the usage of Python UDFs in CaseWhen. I think it can be very easy to incorporate the condition logic of CaseWhen into the Python UDFs, e.g. for the above example: {code} def Divide10(): def fn(value): return 10 / int(value) if (value > 0) else None return udf(fn, types.IntegerType()) df2 = df.select(when((x > 0), Divide10()(x))) df2.show() ++ |CASE WHEN (x > 0) THEN fn(x) END| ++ | 2| |null| ++ {code} > UDF is evaluated when 'F.when' condition is false > - > > Key: SPARK-22347 > URL: https://issues.apache.org/jira/browse/SPARK-22347 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.0 >Reporter: Nicolas Porter >Priority: Minor > > Here's a simple example on how to reproduce this: > {code} > from pyspark.sql import functions as F, Row, types > def Divide10(): > def fn(value): return 10 / int(value) > return F.udf(fn, types.IntegerType()) > df = sc.parallelize([Row(x=5), Row(x=0)]).toDF() > x = F.col('x') > df2 = df.select(F.when((x > 0), Divide10()(x))) > df2.show(200) > {code} > This raises a division by zero error, even if `F.when` is trying to filter > out all cases where `x <= 0`. I believe the correct behavior should be not to > evaluate the UDF when the `F.when` condition is false. > Interestingly enough, when the `F.when` condition is set to `F.lit(False)`, > then the error is not raised and all rows resolve to `null`, which is the > expected result. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18755) Add Randomized Grid Search to Spark ML
[ https://issues.apache.org/jira/browse/SPARK-18755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221687#comment-16221687 ] Ilya Matiach commented on SPARK-18755: -- I've created a PR that adds randomized grid search to mmlspark package here: https://github.com/Azure/mmlspark/pull/168 > Add Randomized Grid Search to Spark ML > -- > > Key: SPARK-18755 > URL: https://issues.apache.org/jira/browse/SPARK-18755 > Project: Spark > Issue Type: Improvement > Components: ML >Reporter: yuhao yang > > Randomized Grid Search implements a randomized search over parameters, where > each setting is sampled from a distribution over possible parameter values. > This has two main benefits over an exhaustive search: > 1. A budget can be chosen independent of the number of parameters and > possible values. > 2. Adding parameters that do not influence the performance does not decrease > efficiency. > Randomized Grid search usually gives similar result as exhaustive search, > while the run time for randomized search is drastically lower. > For more background, please refer to: > sklearn: http://scikit-learn.org/stable/modules/grid_search.html > http://blog.kaggle.com/2015/07/16/scikit-learn-video-8-efficiently-searching-for-optimal-tuning-parameters/ > http://www.jmlr.org/papers/volume13/bergstra12a/bergstra12a.pdf > https://www.r-bloggers.com/hyperparameter-optimization-in-h2o-grid-search-random-search-and-the-future/. > There're two ways to implement this in Spark as I see: > 1. Add searchRatio to ParamGridBuilder and conduct sampling directly during > build. Only 1 new public function is required. > 2. Add trait RadomizedSearch and create new class RandomizedCrossValidator > and RandomizedTrainValiationSplit, which can be complicated since we need to > deal with the models. > I'd prefer option 1 as it's much simpler and straightforward. We can support > Randomized grid search via some smallest change. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22347) UDF is evaluated when 'F.when' condition is false
[ https://issues.apache.org/jira/browse/SPARK-22347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221612#comment-16221612 ] Liang-Chi Hsieh edited comment on SPARK-22347 at 10/27/17 2:44 AM: --- Under the current execution mode of Python UDFs, I think it is hard to support Python UDFs as branch values or else value in CaseWhen expression. The execution of batch/vectorized Python UDFs evaluates the UDFs in an operator at once. It might not be easy to let it support conditional execution. I'd rather like to disable the usage of Python UDFs in CaseWhen. I think it can be very easy to incorporate the condition logic of CaseWhen into the Python UDFs, e.g. for the above example: {code} def Divide10(): def fn(value): return 10 / int(value) if (value > 0) else None return udf(fn, types.IntegerType()) df2 = df.select(when((x > 0), Divide10()(x))) df2.show() ++ |CASE WHEN (x > 0) THEN fn(x) END| ++ | 2| |null| ++ {code} was (Author: viirya): Under the current execution mode of Python UDFs, I think it is hard to support Python UDFs as branch values or else value in CaseWhen expression. The execution of batch/vectorized Python UDFs evaluates the UDFs in an operator at once. It might not be easy to let it support conditional execution. I'd rather disable the usage of Python UDFs in CaseWhen. I think it can be very easy to incorporate the condition logic of CaseWhen into the Python UDFs, e.g. for the above example: {code} def Divide10(): def fn(value): return 10 / int(value) if (value > 0) else None return udf(fn, types.IntegerType()) df2 = df.select(when((x > 0), Divide10()(x))) df2.show() ++ |CASE WHEN (x > 0) THEN fn(x) END| ++ | 2| |null| ++ {code} > UDF is evaluated when 'F.when' condition is false > - > > Key: SPARK-22347 > URL: https://issues.apache.org/jira/browse/SPARK-22347 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.0 >Reporter: Nicolas Porter >Priority: Minor > > Here's a simple example on how to reproduce this: > {code} > from pyspark.sql import functions as F, Row, types > def Divide10(): > def fn(value): return 10 / int(value) > return F.udf(fn, types.IntegerType()) > df = sc.parallelize([Row(x=5), Row(x=0)]).toDF() > x = F.col('x') > df2 = df.select(F.when((x > 0), Divide10()(x))) > df2.show(200) > {code} > This raises a division by zero error, even if `F.when` is trying to filter > out all cases where `x <= 0`. I believe the correct behavior should be not to > evaluate the UDF when the `F.when` condition is false. > Interestingly enough, when the `F.when` condition is set to `F.lit(False)`, > then the error is not raised and all rows resolve to `null`, which is the > expected result. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22347) UDF is evaluated when 'F.when' condition is false
[ https://issues.apache.org/jira/browse/SPARK-22347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221612#comment-16221612 ] Liang-Chi Hsieh commented on SPARK-22347: - Under the current execution mode of Python UDFs, I think it is hard to support Python UDFs as branch values or else value in CaseWhen expression. The execution of batch/vectorized Python UDFs evaluates the UDFs in an operator at once. It might not be easy to let it support conditional execution. I'd rather disable the usage of Python UDFs in CaseWhen. I think it can be very easy to incorporate the condition logic of CaseWhen into the Python UDFs, e.g. for the above example: {code} def Divide10(): def fn(value): return 10 / int(value) if (value > 0) else None return udf(fn, types.IntegerType()) df2 = df.select(when((x > 0), Divide10()(x))) df2.show() ++ |CASE WHEN (x > 0) THEN fn(x) END| ++ | 2| |null| ++ {code} > UDF is evaluated when 'F.when' condition is false > - > > Key: SPARK-22347 > URL: https://issues.apache.org/jira/browse/SPARK-22347 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.0 >Reporter: Nicolas Porter >Priority: Minor > > Here's a simple example on how to reproduce this: > {code} > from pyspark.sql import functions as F, Row, types > def Divide10(): > def fn(value): return 10 / int(value) > return F.udf(fn, types.IntegerType()) > df = sc.parallelize([Row(x=5), Row(x=0)]).toDF() > x = F.col('x') > df2 = df.select(F.when((x > 0), Divide10()(x))) > df2.show(200) > {code} > This raises a division by zero error, even if `F.when` is trying to filter > out all cases where `x <= 0`. I believe the correct behavior should be not to > evaluate the UDF when the `F.when` condition is false. > Interestingly enough, when the `F.when` condition is set to `F.lit(False)`, > then the error is not raised and all rows resolve to `null`, which is the > expected result. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-22355) Dataset.collect is not threadsafe
[ https://issues.apache.org/jira/browse/SPARK-22355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li resolved SPARK-22355. - Resolution: Fixed Fix Version/s: 2.3.0 2.2.1 > Dataset.collect is not threadsafe > - > > Key: SPARK-22355 > URL: https://issues.apache.org/jira/browse/SPARK-22355 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan > Fix For: 2.2.1, 2.3.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-22356) data source table should support overlapped columns between data and partition schema
[ https://issues.apache.org/jira/browse/SPARK-22356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li resolved SPARK-22356. - Resolution: Fixed Fix Version/s: 2.3.0 2.2.1 > data source table should support overlapped columns between data and > partition schema > - > > Key: SPARK-22356 > URL: https://issues.apache.org/jira/browse/SPARK-22356 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan > Fix For: 2.2.1, 2.3.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-22366) Support ignoreMissingFiles flag parallel to ignoreCorruptFiles
[ https://issues.apache.org/jira/browse/SPARK-22366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-22366. -- Resolution: Fixed Assignee: Jose Torres Fix Version/s: 2.3.0 > Support ignoreMissingFiles flag parallel to ignoreCorruptFiles > -- > > Key: SPARK-22366 > URL: https://issues.apache.org/jira/browse/SPARK-22366 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Jose Torres >Assignee: Jose Torres >Priority: Minor > Fix For: 2.3.0 > > > There's an existing flag "spark.sql.files.ignoreCorruptFiles" that will > quietly ignore attempted reads from files that have been corrupted, but it > still allows the query to fail on missing files. Being able to ignore missing > files too is useful in some replication scenarios. > We should add a "spark.sql.files.ignoreMissingFiles" to fill out the > functionality. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22344) Prevent R CMD check from using /tmp
[ https://issues.apache.org/jira/browse/SPARK-22344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221460#comment-16221460 ] Hossein Falaki commented on SPARK-22344: I don't have solid pointer as to why we are creating these temp directories for hive. I think it would be nicer to fix them in Spark. It is a good practice not to leave files on /tmp. > Prevent R CMD check from using /tmp > --- > > Key: SPARK-22344 > URL: https://issues.apache.org/jira/browse/SPARK-22344 > Project: Spark > Issue Type: Bug > Components: SparkR >Affects Versions: 1.6.3, 2.1.2, 2.2.0, 2.3.0 >Reporter: Shivaram Venkataraman > > When R CMD check is run on the SparkR package it leaves behind files in /tmp > which is a violation of CRAN policy. We should instead write to Rtmpdir. > Notes from CRAN are below > {code} > Checking this leaves behind dirs >hive/$USER >$USER > and files named like >b4f6459b-0624-4100-8358-7aa7afbda757_resources > in /tmp, in violation of the CRAN Policy. > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22305) HDFSBackedStateStoreProvider fails with StackOverflowException when attempting to recover state
[ https://issues.apache.org/jira/browse/SPARK-22305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221432#comment-16221432 ] Shixiong Zhu commented on SPARK-22305: -- [~Yuval.Itzchakov] how many batches per 1 minute in your query? If there are a lot of batches, you can try to run your application with `--conf spark.sql.streaming.stateStore.maintenanceInterval=10s` to set a small interval as a workaround. However, we definitely should fix this by rewriting these codes in a non-recursive way. > HDFSBackedStateStoreProvider fails with StackOverflowException when > attempting to recover state > --- > > Key: SPARK-22305 > URL: https://issues.apache.org/jira/browse/SPARK-22305 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Yuval Itzchakov > > Environment: > Spark: 2.2.0 > Java version: 1.8.0_112 > spark.sql.streaming.minBatchesToRetain: 100 > After an application failure due to OOM exceptions, restarting the > application with the existing state produces the following OOM: > {code:java} > java.io.IOException: com.google.protobuf.ServiceException: > java.lang.StackOverflowError > at > org.apache.hadoop.ipc.ProtobufHelper.getRemoteException(ProtobufHelper.java:47) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:260) > at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy18.getBlockLocations(Unknown Source) > at > org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1240) > at > org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1227) > at > org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1215) > at > org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:303) > at > org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:269) > at org.apache.hadoop.hdfs.DFSInputStream.(DFSInputStream.java:261) > at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1540) > at > org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304) > at > org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312) > at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$readSnapshotFile(HDFSBackedStateStoreProvider.scala:405) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:295) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:295) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:297) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:296) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296) > at >
[jira] [Assigned] (SPARK-22339) Push epoch updates to executors on fetch failure to avoid fetch retries for missing executors
[ https://issues.apache.org/jira/browse/SPARK-22339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-22339: Assignee: (was: Apache Spark) > Push epoch updates to executors on fetch failure to avoid fetch retries for > missing executors > - > > Key: SPARK-22339 > URL: https://issues.apache.org/jira/browse/SPARK-22339 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 2.2.0 >Reporter: Juan Rodríguez Hortalá > Attachments: push_epoch_update-WIP.diff > > > When a task finishes with error due to a fetch error, then DAGScheduler > unregisters the shuffle blocks hosted by the serving executor (or even all > the executors in the failing host, with external shuffle and > spark.files.fetchFailure.unRegisterOutputOnHost enabled) in the shuffle block > directory stored by MapOutputTracker, that then increments its epoch as a > result. This event is only signaled to the other executors when a new task > with a new epoch starts in each executor. This means that other executors > reading from the failed executors will retry fetching shuffle blocks from > them, even though the driver already knows those executors are lost and those > blocks are now unavailable at those locations. This impacts job runtime, > specially for long shuffles and executor failures at the end of a stage, when > the only pending tasks are shuffle reads. > This could be improved by pushing the epoch update to the executors without > having to wait for a new task. In the attached patch I sketch a possible > solution that sends the updated epoch from the driver to the executors by > piggybacking on the executor heartbeat response. ShuffleBlockFetcherIterator, > RetryingBlockFetcher and BlockFetchingListener are modified so blocks > locations are checked on each fetch retry. This doesn't introduce additional > traffic, as MapOutputTrackerWorker.mapStatuses is shared by all tasks running > on the same Executor, and the lookup of the new shuffle blocks directory was > going to happen anyway when the new epoch is detected during the start of the > next task. > I would like to know the opinion of the community on this approach. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22339) Push epoch updates to executors on fetch failure to avoid fetch retries for missing executors
[ https://issues.apache.org/jira/browse/SPARK-22339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221423#comment-16221423 ] Apache Spark commented on SPARK-22339: -- User 'juanrh' has created a pull request for this issue: https://github.com/apache/spark/pull/19583 > Push epoch updates to executors on fetch failure to avoid fetch retries for > missing executors > - > > Key: SPARK-22339 > URL: https://issues.apache.org/jira/browse/SPARK-22339 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 2.2.0 >Reporter: Juan Rodríguez Hortalá > Attachments: push_epoch_update-WIP.diff > > > When a task finishes with error due to a fetch error, then DAGScheduler > unregisters the shuffle blocks hosted by the serving executor (or even all > the executors in the failing host, with external shuffle and > spark.files.fetchFailure.unRegisterOutputOnHost enabled) in the shuffle block > directory stored by MapOutputTracker, that then increments its epoch as a > result. This event is only signaled to the other executors when a new task > with a new epoch starts in each executor. This means that other executors > reading from the failed executors will retry fetching shuffle blocks from > them, even though the driver already knows those executors are lost and those > blocks are now unavailable at those locations. This impacts job runtime, > specially for long shuffles and executor failures at the end of a stage, when > the only pending tasks are shuffle reads. > This could be improved by pushing the epoch update to the executors without > having to wait for a new task. In the attached patch I sketch a possible > solution that sends the updated epoch from the driver to the executors by > piggybacking on the executor heartbeat response. ShuffleBlockFetcherIterator, > RetryingBlockFetcher and BlockFetchingListener are modified so blocks > locations are checked on each fetch retry. This doesn't introduce additional > traffic, as MapOutputTrackerWorker.mapStatuses is shared by all tasks running > on the same Executor, and the lookup of the new shuffle blocks directory was > going to happen anyway when the new epoch is detected during the start of the > next task. > I would like to know the opinion of the community on this approach. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-22339) Push epoch updates to executors on fetch failure to avoid fetch retries for missing executors
[ https://issues.apache.org/jira/browse/SPARK-22339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-22339: Assignee: Apache Spark > Push epoch updates to executors on fetch failure to avoid fetch retries for > missing executors > - > > Key: SPARK-22339 > URL: https://issues.apache.org/jira/browse/SPARK-22339 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 2.2.0 >Reporter: Juan Rodríguez Hortalá >Assignee: Apache Spark > Attachments: push_epoch_update-WIP.diff > > > When a task finishes with error due to a fetch error, then DAGScheduler > unregisters the shuffle blocks hosted by the serving executor (or even all > the executors in the failing host, with external shuffle and > spark.files.fetchFailure.unRegisterOutputOnHost enabled) in the shuffle block > directory stored by MapOutputTracker, that then increments its epoch as a > result. This event is only signaled to the other executors when a new task > with a new epoch starts in each executor. This means that other executors > reading from the failed executors will retry fetching shuffle blocks from > them, even though the driver already knows those executors are lost and those > blocks are now unavailable at those locations. This impacts job runtime, > specially for long shuffles and executor failures at the end of a stage, when > the only pending tasks are shuffle reads. > This could be improved by pushing the epoch update to the executors without > having to wait for a new task. In the attached patch I sketch a possible > solution that sends the updated epoch from the driver to the executors by > piggybacking on the executor heartbeat response. ShuffleBlockFetcherIterator, > RetryingBlockFetcher and BlockFetchingListener are modified so blocks > locations are checked on each fetch retry. This doesn't introduce additional > traffic, as MapOutputTrackerWorker.mapStatuses is shared by all tasks running > on the same Executor, and the lookup of the new shuffle blocks directory was > going to happen anyway when the new epoch is detected during the start of the > next task. > I would like to know the opinion of the community on this approach. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-22131) Add Mesos Secrets Support to the Mesos Driver
[ https://issues.apache.org/jira/browse/SPARK-22131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marcelo Vanzin resolved SPARK-22131. Resolution: Fixed Assignee: Susan X. Huynh Fix Version/s: 2.3.0 > Add Mesos Secrets Support to the Mesos Driver > - > > Key: SPARK-22131 > URL: https://issues.apache.org/jira/browse/SPARK-22131 > Project: Spark > Issue Type: New Feature > Components: Mesos >Affects Versions: 2.3.0 >Reporter: Arthur Rand >Assignee: Susan X. Huynh > Fix For: 2.3.0 > > > We recently added Secrets support to the Dispatcher (SPARK-20812). In order > to have Driver-to-Executor TLS we need the same support in the Mesos Driver > so a secret can be disseminated to the executors. This JIRA is to move the > current secrets implementation to be used by both frameworks. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22357) SparkContext.binaryFiles ignore minPartitions parameter
[ https://issues.apache.org/jira/browse/SPARK-22357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221388#comment-16221388 ] Ilya Matiach commented on SPARK-22357: -- binaryFiles ignores the number of partitions I want to have, even if I specify the value. I have to repartition the returned DataFrame. In my specific case, the number of partitions was very small, which caused performance issues. I needed to increase the number of partitions by repartitioning the DataFrame after it was constructed, but this can be expensive - it would be better to create the DataFrame with the user-specified number of partitions. > SparkContext.binaryFiles ignore minPartitions parameter > --- > > Key: SPARK-22357 > URL: https://issues.apache.org/jira/browse/SPARK-22357 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.2, 2.2.0 >Reporter: Weichen Xu > > this is a bug in binaryFiles - even though we give it the partitions, > binaryFiles ignores it. > This is a bug introduced in spark 2.1 from spark 2.0, in file > PortableDataStream.scala the argument “minPartitions” is no longer used (with > the push to master on 11/7/6): > {code} > /** > Allow minPartitions set by end-user in order to keep compatibility with old > Hadoop API > which is set through setMaxSplitSize > */ > def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: > Int) { > val defaultMaxSplitBytes = > sc.getConf.get(config.FILES_MAX_PARTITION_BYTES) > val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES) > val defaultParallelism = sc.defaultParallelism > val files = listStatus(context).asScala > val totalBytes = files.filterNot(.isDirectory).map(.getLen + > openCostInBytes).sum > val bytesPerCore = totalBytes / defaultParallelism > val maxSplitSize = Math.min(defaultMaxSplitBytes, > Math.max(openCostInBytes, bytesPerCore)) > super.setMaxSplitSize(maxSplitSize) > } > {code} > The code previously, in version 2.0, was: > {code} > def setMinPartitions(context: JobContext, minPartitions: Int) { > val totalLen = > listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum > val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, > 1.0)).toLong > super.setMaxSplitSize(maxSplitSize) > } > {code} > The new code is very smart, but it ignores what the user passes in and uses > the data size, which is kind of a breaking change in some sense > In our specific case this was a problem, because we initially read in just > the files names and only after that the dataframe becomes very large, when > reading in the images themselves – and in this case the new code does not > handle the partitioning very well. > I’m not sure if it can be easily fixed because I don’t understand the full > context of the change in spark (but at the very least the unused parameter > should be removed to avoid confusion). -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20644) Hook up Spark UI to the new key-value store backend
[ https://issues.apache.org/jira/browse/SPARK-20644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20644: Assignee: Apache Spark > Hook up Spark UI to the new key-value store backend > --- > > Key: SPARK-20644 > URL: https://issues.apache.org/jira/browse/SPARK-20644 > Project: Spark > Issue Type: Sub-task > Components: Web UI >Affects Versions: 2.3.0 >Reporter: Marcelo Vanzin >Assignee: Apache Spark > > See spec in parent issue (SPARK-18085) for more details. > This task tracks hooking up the Spark UI (both live and SHS) to the key-value > store based backend. It's the initial work to allow individual UI pages to be > de-coupled from the listener implementations and use the REST API data saved > in the store. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20644) Hook up Spark UI to the new key-value store backend
[ https://issues.apache.org/jira/browse/SPARK-20644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20644: Assignee: (was: Apache Spark) > Hook up Spark UI to the new key-value store backend > --- > > Key: SPARK-20644 > URL: https://issues.apache.org/jira/browse/SPARK-20644 > Project: Spark > Issue Type: Sub-task > Components: Web UI >Affects Versions: 2.3.0 >Reporter: Marcelo Vanzin > > See spec in parent issue (SPARK-18085) for more details. > This task tracks hooking up the Spark UI (both live and SHS) to the key-value > store based backend. It's the initial work to allow individual UI pages to be > de-coupled from the listener implementations and use the REST API data saved > in the store. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20644) Hook up Spark UI to the new key-value store backend
[ https://issues.apache.org/jira/browse/SPARK-20644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221372#comment-16221372 ] Apache Spark commented on SPARK-20644: -- User 'vanzin' has created a pull request for this issue: https://github.com/apache/spark/pull/19582 > Hook up Spark UI to the new key-value store backend > --- > > Key: SPARK-20644 > URL: https://issues.apache.org/jira/browse/SPARK-20644 > Project: Spark > Issue Type: Sub-task > Components: Web UI >Affects Versions: 2.3.0 >Reporter: Marcelo Vanzin > > See spec in parent issue (SPARK-18085) for more details. > This task tracks hooking up the Spark UI (both live and SHS) to the key-value > store based backend. It's the initial work to allow individual UI pages to be > de-coupled from the listener implementations and use the REST API data saved > in the store. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-22366) Support ignoreMissingFiles flag parallel to ignoreCorruptFiles
[ https://issues.apache.org/jira/browse/SPARK-22366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-22366: - Component/s: (was: Spark Core) SQL > Support ignoreMissingFiles flag parallel to ignoreCorruptFiles > -- > > Key: SPARK-22366 > URL: https://issues.apache.org/jira/browse/SPARK-22366 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Jose Torres >Priority: Minor > > There's an existing flag "spark.sql.files.ignoreCorruptFiles" that will > quietly ignore attempted reads from files that have been corrupted, but it > still allows the query to fail on missing files. Being able to ignore missing > files too is useful in some replication scenarios. > We should add a "spark.sql.files.ignoreMissingFiles" to fill out the > functionality. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-22366) Support ignoreMissingFiles flag parallel to ignoreCorruptFiles
[ https://issues.apache.org/jira/browse/SPARK-22366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-22366: - Issue Type: Improvement (was: Bug) > Support ignoreMissingFiles flag parallel to ignoreCorruptFiles > -- > > Key: SPARK-22366 > URL: https://issues.apache.org/jira/browse/SPARK-22366 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Jose Torres >Priority: Minor > > There's an existing flag "spark.sql.files.ignoreCorruptFiles" that will > quietly ignore attempted reads from files that have been corrupted, but it > still allows the query to fail on missing files. Being able to ignore missing > files too is useful in some replication scenarios. > We should add a "spark.sql.files.ignoreMissingFiles" to fill out the > functionality. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22366) Support ignoreMissingFiles flag parallel to ignoreCorruptFiles
[ https://issues.apache.org/jira/browse/SPARK-22366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221225#comment-16221225 ] Apache Spark commented on SPARK-22366: -- User 'joseph-torres' has created a pull request for this issue: https://github.com/apache/spark/pull/19581 > Support ignoreMissingFiles flag parallel to ignoreCorruptFiles > -- > > Key: SPARK-22366 > URL: https://issues.apache.org/jira/browse/SPARK-22366 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Jose Torres >Priority: Minor > > There's an existing flag "spark.sql.files.ignoreCorruptFiles" that will > quietly ignore attempted reads from files that have been corrupted, but it > still allows the query to fail on missing files. Being able to ignore missing > files too is useful in some replication scenarios. > We should add a "spark.sql.files.ignoreMissingFiles" to fill out the > functionality. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-22366) Support ignoreMissingFiles flag parallel to ignoreCorruptFiles
[ https://issues.apache.org/jira/browse/SPARK-22366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-22366: Assignee: Apache Spark > Support ignoreMissingFiles flag parallel to ignoreCorruptFiles > -- > > Key: SPARK-22366 > URL: https://issues.apache.org/jira/browse/SPARK-22366 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Jose Torres >Assignee: Apache Spark >Priority: Minor > > There's an existing flag "spark.sql.files.ignoreCorruptFiles" that will > quietly ignore attempted reads from files that have been corrupted, but it > still allows the query to fail on missing files. Being able to ignore missing > files too is useful in some replication scenarios. > We should add a "spark.sql.files.ignoreMissingFiles" to fill out the > functionality. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-22366) Support ignoreMissingFiles flag parallel to ignoreCorruptFiles
[ https://issues.apache.org/jira/browse/SPARK-22366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-22366: Assignee: (was: Apache Spark) > Support ignoreMissingFiles flag parallel to ignoreCorruptFiles > -- > > Key: SPARK-22366 > URL: https://issues.apache.org/jira/browse/SPARK-22366 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Jose Torres >Priority: Minor > > There's an existing flag "spark.sql.files.ignoreCorruptFiles" that will > quietly ignore attempted reads from files that have been corrupted, but it > still allows the query to fail on missing files. Being able to ignore missing > files too is useful in some replication scenarios. > We should add a "spark.sql.files.ignoreMissingFiles" to fill out the > functionality. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22366) Support ignoreMissingFiles flag parallel to ignoreCorruptFiles
Jose Torres created SPARK-22366: --- Summary: Support ignoreMissingFiles flag parallel to ignoreCorruptFiles Key: SPARK-22366 URL: https://issues.apache.org/jira/browse/SPARK-22366 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.2.0 Reporter: Jose Torres Priority: Minor There's an existing flag "spark.sql.files.ignoreCorruptFiles" that will quietly ignore attempted reads from files that have been corrupted, but it still allows the query to fail on missing files. Being able to ignore missing files too is useful in some replication scenarios. We should add a "spark.sql.files.ignoreMissingFiles" to fill out the functionality. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-22328) ClosureCleaner misses referenced superclass fields, gives them null values
[ https://issues.apache.org/jira/browse/SPARK-22328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-22328. - Resolution: Fixed Fix Version/s: 2.3.0 2.2.1 Issue resolved by pull request 19556 [https://github.com/apache/spark/pull/19556] > ClosureCleaner misses referenced superclass fields, gives them null values > -- > > Key: SPARK-22328 > URL: https://issues.apache.org/jira/browse/SPARK-22328 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Ryan Williams > Fix For: 2.2.1, 2.3.0 > > > [Runnable repro > here|https://github.com/ryan-williams/spark-bugs/tree/closure]: > Superclass with some fields: > {code} > abstract class App extends Serializable { > // SparkContext stub > @transient lazy val sc = new SparkContext(new > SparkConf().setAppName("test").setMaster("local[4]").set("spark.ui.showConsoleProgress", > "false")) > // These fields get missed by the ClosureCleaner in some situations > val n1 = 111 > val s1 = "aaa" > // Simple scaffolding to exercise passing a closure to RDD.foreach in > subclasses > def rdd = sc.parallelize(1 to 1) > def run(name: String): Unit = { > print(s"$name:\t") > body() > sc.stop() > } > def body(): Unit > } > {code} > Running a simple Spark job with various instantiations of this class: > {code} > object Main { > /** [[App]]s generated this way will not correctly detect references to > [[App.n1]] in Spark closures */ > val fn = () ⇒ new App { > val n2 = 222 > val s2 = "bbb" > def body(): Unit = rdd.foreach { _ ⇒ println(s"$n1, $n2, $s1, $s2") } > } > /** Doesn't serialize closures correctly */ > val app1 = fn() > /** Works fine */ > val app2 = > new App { > val n2 = 222 > val s2 = "bbb" > def body(): Unit = rdd.foreach { _ ⇒ println(s"$n1, $n2, $s1, $s2") } > } > /** [[App]]s created this way also work fine */ > def makeApp(): App = > new App { > val n2 = 222 > val s2 = "bbb" > def body(): Unit = rdd.foreach { _ ⇒ println(s"$n1, $n2, $s1, $s2") } > } > val app3 = makeApp() // ok > val fn2 = () ⇒ makeApp() // ok > def main(args: Array[String]): Unit = { > fn().run("fn")// bad: n1 → 0, s1 → null > app1.run("app1") // bad: n1 → 0, s1 → null > app2.run("app2") // ok > app3.run("app3") // ok > fn2().run("fn2") // ok > } > } > {code} > Build + Run: > {code} > $ sbt run > … > fn: 0, 222, null, bbb > app1: 0, 222, null, bbb > app2: 111, 222, aaa, bbb > app3: 111, 222, aaa, bbb > fn2: 111, 222, aaa, bbb > {code} > The first two versions have {{0}} and {{null}}, resp., for the {{A.n1}} and > {{A.s1}} fields. > Something about this syntax causes the problem: > {code} > () => new App { … } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-22328) ClosureCleaner misses referenced superclass fields, gives them null values
[ https://issues.apache.org/jira/browse/SPARK-22328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-22328: --- Assignee: Liang-Chi Hsieh > ClosureCleaner misses referenced superclass fields, gives them null values > -- > > Key: SPARK-22328 > URL: https://issues.apache.org/jira/browse/SPARK-22328 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Ryan Williams >Assignee: Liang-Chi Hsieh > Fix For: 2.2.1, 2.3.0 > > > [Runnable repro > here|https://github.com/ryan-williams/spark-bugs/tree/closure]: > Superclass with some fields: > {code} > abstract class App extends Serializable { > // SparkContext stub > @transient lazy val sc = new SparkContext(new > SparkConf().setAppName("test").setMaster("local[4]").set("spark.ui.showConsoleProgress", > "false")) > // These fields get missed by the ClosureCleaner in some situations > val n1 = 111 > val s1 = "aaa" > // Simple scaffolding to exercise passing a closure to RDD.foreach in > subclasses > def rdd = sc.parallelize(1 to 1) > def run(name: String): Unit = { > print(s"$name:\t") > body() > sc.stop() > } > def body(): Unit > } > {code} > Running a simple Spark job with various instantiations of this class: > {code} > object Main { > /** [[App]]s generated this way will not correctly detect references to > [[App.n1]] in Spark closures */ > val fn = () ⇒ new App { > val n2 = 222 > val s2 = "bbb" > def body(): Unit = rdd.foreach { _ ⇒ println(s"$n1, $n2, $s1, $s2") } > } > /** Doesn't serialize closures correctly */ > val app1 = fn() > /** Works fine */ > val app2 = > new App { > val n2 = 222 > val s2 = "bbb" > def body(): Unit = rdd.foreach { _ ⇒ println(s"$n1, $n2, $s1, $s2") } > } > /** [[App]]s created this way also work fine */ > def makeApp(): App = > new App { > val n2 = 222 > val s2 = "bbb" > def body(): Unit = rdd.foreach { _ ⇒ println(s"$n1, $n2, $s1, $s2") } > } > val app3 = makeApp() // ok > val fn2 = () ⇒ makeApp() // ok > def main(args: Array[String]): Unit = { > fn().run("fn")// bad: n1 → 0, s1 → null > app1.run("app1") // bad: n1 → 0, s1 → null > app2.run("app2") // ok > app3.run("app3") // ok > fn2().run("fn2") // ok > } > } > {code} > Build + Run: > {code} > $ sbt run > … > fn: 0, 222, null, bbb > app1: 0, 222, null, bbb > app2: 111, 222, aaa, bbb > app3: 111, 222, aaa, bbb > fn2: 111, 222, aaa, bbb > {code} > The first two versions have {{0}} and {{null}}, resp., for the {{A.n1}} and > {{A.s1}} fields. > Something about this syntax causes the problem: > {code} > () => new App { … } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22357) SparkContext.binaryFiles ignore minPartitions parameter
[ https://issues.apache.org/jira/browse/SPARK-22357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221078#comment-16221078 ] Bo Meng commented on SPARK-22357: - a quick fix could be as follows, correct me if i am wrong. val defaultParallelism = Math.max(sc.defaultParallelism, minPartitions) > SparkContext.binaryFiles ignore minPartitions parameter > --- > > Key: SPARK-22357 > URL: https://issues.apache.org/jira/browse/SPARK-22357 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.2, 2.2.0 >Reporter: Weichen Xu > > this is a bug in binaryFiles - even though we give it the partitions, > binaryFiles ignores it. > This is a bug introduced in spark 2.1 from spark 2.0, in file > PortableDataStream.scala the argument “minPartitions” is no longer used (with > the push to master on 11/7/6): > {code} > /** > Allow minPartitions set by end-user in order to keep compatibility with old > Hadoop API > which is set through setMaxSplitSize > */ > def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: > Int) { > val defaultMaxSplitBytes = > sc.getConf.get(config.FILES_MAX_PARTITION_BYTES) > val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES) > val defaultParallelism = sc.defaultParallelism > val files = listStatus(context).asScala > val totalBytes = files.filterNot(.isDirectory).map(.getLen + > openCostInBytes).sum > val bytesPerCore = totalBytes / defaultParallelism > val maxSplitSize = Math.min(defaultMaxSplitBytes, > Math.max(openCostInBytes, bytesPerCore)) > super.setMaxSplitSize(maxSplitSize) > } > {code} > The code previously, in version 2.0, was: > {code} > def setMinPartitions(context: JobContext, minPartitions: Int) { > val totalLen = > listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum > val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, > 1.0)).toLong > super.setMaxSplitSize(maxSplitSize) > } > {code} > The new code is very smart, but it ignores what the user passes in and uses > the data size, which is kind of a breaking change in some sense > In our specific case this was a problem, because we initially read in just > the files names and only after that the dataframe becomes very large, when > reading in the images themselves – and in this case the new code does not > handle the partitioning very well. > I’m not sure if it can be easily fixed because I don’t understand the full > context of the change in spark (but at the very least the unused parameter > should be removed to avoid confusion). -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20928) SPIP: Continuous Processing Mode for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221025#comment-16221025 ] Cody Koeninger commented on SPARK-20928: No, it doesn't exist yet as far as I know. Reason I ask is that Michael had said on the dev list in September "I think that we are going to have to change the Sink API as part of SPARK-20928, which is why I linked these tickets together." For aggregates, conceptually I think that the minimum and maximum per partition kafka offset for any data involved in the aggregate is sufficient to identify it. But it seems like map-only is the bigger focus here, which is probably fine. > SPIP: Continuous Processing Mode for Structured Streaming > - > > Key: SPARK-20928 > URL: https://issues.apache.org/jira/browse/SPARK-20928 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Michael Armbrust > Labels: SPIP > Attachments: Continuous Processing in Structured Streaming Design > Sketch.pdf > > > Given the current Source API, the minimum possible latency for any record is > bounded by the amount of time that it takes to launch a task. This > limitation is a result of the fact that {{getBatch}} requires us to know both > the starting and the ending offset, before any tasks are launched. In the > worst case, the end-to-end latency is actually closer to the average batch > time + task launching time. > For applications where latency is more important than exactly-once output > however, it would be useful if processing could happen continuously. This > would allow us to achieve fully pipelined reading and writing from sources > such as Kafka. This kind of architecture would make it possible to process > records with end-to-end latencies on the order of 1 ms, rather than the > 10-100ms that is possible today. > One possible architecture here would be to change the Source API to look like > the following rough sketch: > {code} > trait Epoch { > def data: DataFrame > /** The exclusive starting position for `data`. */ > def startOffset: Offset > /** The inclusive ending position for `data`. Incrementally updated > during processing, but not complete until execution of the query plan in > `data` is finished. */ > def endOffset: Offset > } > def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], > limits: Limits): Epoch > {code} > The above would allow us to build an alternative implementation of > {{StreamExecution}} that processes continuously with much lower latency and > only stops processing when needing to reconfigure the stream (either due to a > failure or a user requested change in parallelism. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22184) GraphX fails in case of insufficient memory and checkpoints enabled
[ https://issues.apache.org/jira/browse/SPARK-22184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221019#comment-16221019 ] Sergey Zhemzhitsky commented on SPARK-22184: Hello guys, is there a chance for this issue to be looked through as well as the corresponding PR? It would be really great for the fix to be included into spark 2.2.1/2.3.0. > GraphX fails in case of insufficient memory and checkpoints enabled > --- > > Key: SPARK-22184 > URL: https://issues.apache.org/jira/browse/SPARK-22184 > Project: Spark > Issue Type: Bug > Components: GraphX >Affects Versions: 2.2.0 > Environment: spark 2.2.0 > scala 2.11 >Reporter: Sergey Zhemzhitsky > > GraphX fails with FileNotFoundException in case of insufficient memory when > checkpoints are enabled. > Here is the stacktrace > {code} > Job aborted due to stage failure: Task creation failed: > java.io.FileNotFoundException: File > file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0 > does not exist > java.io.FileNotFoundException: File > file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0 > does not exist > at > org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529) > at > org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409) > at > org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89) > at > org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274) > at > org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274) > at scala.Option.map(Option.scala:146) > at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697) > ... > {code} > As GraphX uses cached RDDs intensively, the issue is only reproducible when > previously cached and checkpointed Vertex and Edge RDDs are evicted from > memory and forced to be read from disk. > For testing purposes the following parameters may be set to emulate low > memory environment > {code} > val sparkConf = new SparkConf() > .set("spark.graphx.pregel.checkpointInterval", "2") > // set testing memory to evict cached RDDs from it and force > // reading checkpointed RDDs from disk > .set("spark.testing.reservedMemory", "128") > .set("spark.testing.memory", "256") > {code} > This issue also includes SPARK-22150 and cannot be fixed until SPARK-22150 is > fixed too. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22150) PeriodicCheckpointer fails with FileNotFoundException in case of dependant RDDs
[ https://issues.apache.org/jira/browse/SPARK-22150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221018#comment-16221018 ] Sergey Zhemzhitsky commented on SPARK-22150: Hello guys, is there a chance for this issue to be looked through as well as the corresponding PR? It would be really great for the fix to be included into spark 2.2.1/2.3.0. > PeriodicCheckpointer fails with FileNotFoundException in case of dependant > RDDs > --- > > Key: SPARK-22150 > URL: https://issues.apache.org/jira/browse/SPARK-22150 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, > 2.0.1, 2.0.2, 2.1.0, 2.1.1, 2.2.0 > Environment: spark 2.2.0 > scala 2.11 >Reporter: Sergey Zhemzhitsky > > PeriodicCheckpointer fails with FileNotFoundException in case of > checkpointing dependant RDDs (consider iterative algorithms), i.e. when the > RDD to checkpoint depends on already checkpointed RDD. > Here is the exception > {code} > Job aborted due to stage failure: Task creation failed: > java.io.FileNotFoundException: File > file:/tmp/spark-e2046e49-5d8a-4633-b525-52a00bab32f0/17d9b542-d48d-4f23-b368-3957a42644a6/rdd-20/part-0 > does not exist > java.io.FileNotFoundException: File > file:/tmp/spark-e2046e49-5d8a-4633-b525-52a00bab32f0/17d9b542-d48d-4f23-b368-3957a42644a6/rdd-20/part-0 > does not exist > at > org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529) > at > org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409) > at > org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89) > at > org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274) > at > org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274) > at scala.Option.map(Option.scala:146) > at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1708) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1707) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1707) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1707) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1705) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1705) > at > org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1671) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:989) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:987) > {code} > The issue seems to be in this [piece of > code|https://github.com/apache/spark/blob/0a7f5f2798b6e8b2ba15e8b3aa07d5953ad1c695/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala#L94] > {code:java} > if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0 > && sc.getCheckpointDir.nonEmpty) { > // Add new checkpoint before removing old checkpoints. > checkpoint(newData) > checkpointQueue.enqueue(newData) > // Remove checkpoints before the latest one. > var canDelete = true > while (checkpointQueue.size > 1 && canDelete) { > // Delete the oldest checkpoint only if the next checkpoint exists. > if (isCheckpointed(checkpointQueue.head)) { > removeCheckpointFile() > } else { > canDelete = false > } > } > } > {code} > Given that _checkpointQueue.head_ is checkpointed and materialized and > _newData_ depends on _checkpointQueue.head_, then the exception happens on > action of
[jira] [Commented] (SPARK-20928) SPIP: Continuous Processing Mode for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220956#comment-16220956 ] Reynold Xin commented on SPARK-20928: - That doesn't yet exist does it? How would that work for non-map jobs, e.g. an aggregate? That said, if it is for map-only, this can be tweaked to pass the offset ranges in addition to epoch id. > SPIP: Continuous Processing Mode for Structured Streaming > - > > Key: SPARK-20928 > URL: https://issues.apache.org/jira/browse/SPARK-20928 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Michael Armbrust > Labels: SPIP > Attachments: Continuous Processing in Structured Streaming Design > Sketch.pdf > > > Given the current Source API, the minimum possible latency for any record is > bounded by the amount of time that it takes to launch a task. This > limitation is a result of the fact that {{getBatch}} requires us to know both > the starting and the ending offset, before any tasks are launched. In the > worst case, the end-to-end latency is actually closer to the average batch > time + task launching time. > For applications where latency is more important than exactly-once output > however, it would be useful if processing could happen continuously. This > would allow us to achieve fully pipelined reading and writing from sources > such as Kafka. This kind of architecture would make it possible to process > records with end-to-end latencies on the order of 1 ms, rather than the > 10-100ms that is possible today. > One possible architecture here would be to change the Source API to look like > the following rough sketch: > {code} > trait Epoch { > def data: DataFrame > /** The exclusive starting position for `data`. */ > def startOffset: Offset > /** The inclusive ending position for `data`. Incrementally updated > during processing, but not complete until execution of the query plan in > `data` is finished. */ > def endOffset: Offset > } > def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], > limits: Limits): Epoch > {code} > The above would allow us to build an alternative implementation of > {{StreamExecution}} that processes continuously with much lower latency and > only stops processing when needing to reconfigure the stream (either due to a > failure or a user requested change in parallelism. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22344) Prevent R CMD check from using /tmp
[ https://issues.apache.org/jira/browse/SPARK-22344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220884#comment-16220884 ] Shivaram Venkataraman commented on SPARK-22344: --- Thanks for investigating. Is `/tmp/ubuntu/8201eb2c-8065-458c-b564-1e61b3dc5b7d/` a symlink to `/tmp/hive/ubuntu/8201eb2c-8065-458c-b564-1e61b3dc5b7d/` or are they just different directories ? We can disable the hsperfdata with the suggested flag and also change the java.io.tmpdir which should at least fix the blockmanager I think. I will open a PR for this. Regarding Hive directories created even though its off, I have no idea why that is happening. [~falaki] [~hyukjin.kwon] do you have any idea on why this happens ? > Prevent R CMD check from using /tmp > --- > > Key: SPARK-22344 > URL: https://issues.apache.org/jira/browse/SPARK-22344 > Project: Spark > Issue Type: Bug > Components: SparkR >Affects Versions: 1.6.3, 2.1.2, 2.2.0, 2.3.0 >Reporter: Shivaram Venkataraman > > When R CMD check is run on the SparkR package it leaves behind files in /tmp > which is a violation of CRAN policy. We should instead write to Rtmpdir. > Notes from CRAN are below > {code} > Checking this leaves behind dirs >hive/$USER >$USER > and files named like >b4f6459b-0624-4100-8358-7aa7afbda757_resources > in /tmp, in violation of the CRAN Policy. > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11334) numRunningTasks can't be less than 0, or it will affect executor allocation
[ https://issues.apache.org/jira/browse/SPARK-11334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220881#comment-16220881 ] Apache Spark commented on SPARK-11334: -- User 'sitalkedia' has created a pull request for this issue: https://github.com/apache/spark/pull/19580 > numRunningTasks can't be less than 0, or it will affect executor allocation > --- > > Key: SPARK-11334 > URL: https://issues.apache.org/jira/browse/SPARK-11334 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.4.0 >Reporter: meiyoula >Assignee: meiyoula > > With *Dynamic Allocation* function, a task failed over *maxFailure* time, all > the dependent jobs, stages, tasks will be killed or aborted. In this process, > *SparkListenerTaskEnd* event will be behind in *SparkListenerStageCompleted* > and *SparkListenerJobEnd*. Like the Event Log below: > {code} > {"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":20,"Stage > Attempt ID":0,"Stage Name":"run at AccessController.java:-2","Number of > Tasks":200} > {"Event":"SparkListenerJobEnd","Job ID":9,"Completion Time":1444914699829} > {"Event":"SparkListenerTaskEnd","Stage ID":20,"Stage Attempt ID":0,"Task > Type":"ResultTask","Task End Reason":{"Reason":"TaskKilled"},"Task > Info":{"Task ID":1955,"Index":88,"Attempt":2,"Launch > Time":1444914699763,"Executor > ID":"5","Host":"linux-223","Locality":"PROCESS_LOCAL","Speculative":false,"Getting > Result Time":0,"Finish Time":1444914699864,"Failed":true,"Accumulables":[]}} > {code} > Because that, the *numRunningTasks* in *ExecutorAllocationManager* class will > be less than 0, and it will affect executor allocation. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22312) Spark job stuck with no executor due to bug in Executor Allocation Manager
[ https://issues.apache.org/jira/browse/SPARK-22312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220857#comment-16220857 ] Apache Spark commented on SPARK-22312: -- User 'sitalkedia' has created a pull request for this issue: https://github.com/apache/spark/pull/19580 > Spark job stuck with no executor due to bug in Executor Allocation Manager > -- > > Key: SPARK-22312 > URL: https://issues.apache.org/jira/browse/SPARK-22312 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.2.0 >Reporter: Sital Kedia > > We often see the issue of Spark jobs stuck because the Executor Allocation > Manager does not ask for any executor even if there are pending tasks in case > dynamic allocation is turned on. Looking at the logic in EAM which calculates > the running tasks, it can happen that the calculation will be wrong and the > number of running tasks can become negative. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-22364) unix_timestamp function sets valid dates to null
[ https://issues.apache.org/jira/browse/SPARK-22364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-22364. --- Resolution: Not A Problem These functions largely mimic Hive on purpose, and I think that's just how unix_timestamp("...") is defined to behave. I agree about timezones, and so would virtually always recommend you work in timestamps at the app level, not date strings without timezones. If you have a date string with timezone you should be able to parse it unambiguously with to_date or to_timestamp. > unix_timestamp function sets valid dates to null > > > Key: SPARK-22364 > URL: https://issues.apache.org/jira/browse/SPARK-22364 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 > Environment: Windows 10, United Kingdom >Reporter: Matthew Sinton-Hewitt > > org.apache.spark.sql.functions.unix_timestamp sets some valid dates to null. > The dates happen to be the at the start of Daylight Savings time (UK and > possibly elsewhere). > {code:java} > val spark = SparkSession.builder.getOrCreate() > import spark.implicits._ > spark.sparkContext.parallelize( >Seq("25/03/2012 00:59", "25/03/2012 01:00", "25/03/2012 01:59", > "25/03/2012 02:01")) > .toDF("date") > .select(unix_timestamp($"date", "dd/MM/ HH:mm")) > .show(false) > // results: > // 1332637140, null, null, 1332637260 > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22042) ReorderJoinPredicates can break when child's partitioning is not decided
[ https://issues.apache.org/jira/browse/SPARK-22042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220734#comment-16220734 ] Andrew Ash commented on SPARK-22042: Hi I'm seeing this problem as well, thanks for investigating and putting up a PR [~tejasp]! Have you been running any of your clusters with a patched version of Spark including that change, and has it been behaving as expected? The repro one of my users independently provided was this: {noformat} val rows = List(1, 2, 3, 4, 5, 6); val df1 = sc.parallelize(rows).toDF("col").repartition(1); val df2 = sc.parallelize(rows).toDF("col").repartition(2); val df3 = sc.parallelize(rows).toDF("col").repartition(2); val dd1 = df1.join(df2, df1.col("col").equalTo(df2.col("col"))).join(df3, df2.col("col").equalTo(df3.col("col"))); dd1.show; {noformat} > ReorderJoinPredicates can break when child's partitioning is not decided > > > Key: SPARK-22042 > URL: https://issues.apache.org/jira/browse/SPARK-22042 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0, 2.2.0 >Reporter: Tejas Patil >Priority: Minor > > When `ReorderJoinPredicates` tries to get the `outputPartitioning` of its > children, the children may not be properly constructed as the child-subtree > has to still go through other planner rules. > In this particular case, the child is `SortMergeJoinExec`. Since the required > `Exchange` operators are not in place (because `EnsureRequirements` runs > _after_ `ReorderJoinPredicates`), the join's children would not have > partitioning defined. This breaks while creation the `PartitioningCollection` > here : > https://github.com/apache/spark/blob/94439997d57875838a8283c543f9b44705d3a503/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L69 > Small repro: > {noformat} > context.sql("SET spark.sql.autoBroadcastJoinThreshold=0") > val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", > "k") > df.write.format("parquet").saveAsTable("table1") > df.write.format("parquet").saveAsTable("table2") > df.write.format("parquet").bucketBy(8, "j", "k").saveAsTable("bucketed_table") > sql(""" > SELECT * > FROM ( > SELECT a.i, a.j, a.k > FROM bucketed_table a > JOIN table1 b > ON a.i = b.i > ) c > JOIN table2 > ON c.i = table2.i > """).explain > {noformat} > This fails with : > {noformat} > java.lang.IllegalArgumentException: requirement failed: > PartitioningCollection requires all of its partitionings have the same > numPartitions. > at scala.Predef$.require(Predef.scala:224) > at > org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.(partitioning.scala:324) > at > org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:69) > at > org.apache.spark.sql.execution.ProjectExec.outputPartitioning(basicPhysicalOperators.scala:82) > at > org.apache.spark.sql.execution.joins.ReorderJoinPredicates$$anonfun$apply$1.applyOrElse(ReorderJoinPredicates.scala:91) > at > org.apache.spark.sql.execution.joins.ReorderJoinPredicates$$anonfun$apply$1.applyOrElse(ReorderJoinPredicates.scala:76) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288) > at > org.apache.spark.sql.execution.joins.ReorderJoinPredicates.apply(ReorderJoinPredicates.scala:76) > at > org.apache.spark.sql.execution.joins.ReorderJoinPredicates.apply(ReorderJoinPredicates.scala:34) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:100) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:100) > at > scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) > at scala.collection.immutable.List.foldLeft(List.scala:84) > at > org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:100) > at > org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:90) > at > org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:90) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:201) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:201) > at >
[jira] [Commented] (SPARK-22364) unix_timestamp function sets valid dates to null
[ https://issues.apache.org/jira/browse/SPARK-22364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220697#comment-16220697 ] Matthew Sinton-Hewitt commented on SPARK-22364: --- Yes I think you're right. However, it would be really great if there was a simple way to tell *unix_timestamp* to use a specific timezone. It should not be assumed that the JVM timezone will apply in all cases. You may be dealing with a variety of date sources. > unix_timestamp function sets valid dates to null > > > Key: SPARK-22364 > URL: https://issues.apache.org/jira/browse/SPARK-22364 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 > Environment: Windows 10, United Kingdom >Reporter: Matthew Sinton-Hewitt > > org.apache.spark.sql.functions.unix_timestamp sets some valid dates to null. > The dates happen to be the at the start of Daylight Savings time (UK and > possibly elsewhere). > {code:java} > val spark = SparkSession.builder.getOrCreate() > import spark.implicits._ > spark.sparkContext.parallelize( >Seq("25/03/2012 00:59", "25/03/2012 01:00", "25/03/2012 01:59", > "25/03/2012 02:01")) > .toDF("date") > .select(unix_timestamp($"date", "dd/MM/ HH:mm")) > .show(false) > // results: > // 1332637140, null, null, 1332637260 > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20928) SPIP: Continuous Processing Mode for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220694#comment-16220694 ] Cody Koeninger commented on SPARK-20928: Can you clarify how this impacts sinks having access to the underlying kafka offsets, e.g. https://issues.apache.org/jira/browse/SPARK-18258 > SPIP: Continuous Processing Mode for Structured Streaming > - > > Key: SPARK-20928 > URL: https://issues.apache.org/jira/browse/SPARK-20928 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Michael Armbrust > Labels: SPIP > Attachments: Continuous Processing in Structured Streaming Design > Sketch.pdf > > > Given the current Source API, the minimum possible latency for any record is > bounded by the amount of time that it takes to launch a task. This > limitation is a result of the fact that {{getBatch}} requires us to know both > the starting and the ending offset, before any tasks are launched. In the > worst case, the end-to-end latency is actually closer to the average batch > time + task launching time. > For applications where latency is more important than exactly-once output > however, it would be useful if processing could happen continuously. This > would allow us to achieve fully pipelined reading and writing from sources > such as Kafka. This kind of architecture would make it possible to process > records with end-to-end latencies on the order of 1 ms, rather than the > 10-100ms that is possible today. > One possible architecture here would be to change the Source API to look like > the following rough sketch: > {code} > trait Epoch { > def data: DataFrame > /** The exclusive starting position for `data`. */ > def startOffset: Offset > /** The inclusive ending position for `data`. Incrementally updated > during processing, but not complete until execution of the query plan in > `data` is finished. */ > def endOffset: Offset > } > def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], > limits: Limits): Epoch > {code} > The above would allow us to build an alternative implementation of > {{StreamExecution}} that processes continuously with much lower latency and > only stops processing when needing to reconfigure the stream (either due to a > failure or a user requested change in parallelism. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-20643) Implement listener for saving application status data in key-value store
[ https://issues.apache.org/jira/browse/SPARK-20643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Imran Rashid resolved SPARK-20643. -- Resolution: Fixed Fix Version/s: 2.3.0 Issue resolved by pull request 19383 [https://github.com/apache/spark/pull/19383] > Implement listener for saving application status data in key-value store > > > Key: SPARK-20643 > URL: https://issues.apache.org/jira/browse/SPARK-20643 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Marcelo Vanzin > Fix For: 2.3.0 > > > See spec in parent issue (SPARK-18085) for more details. > This task tracks adding a new listener that will save application state to > the key-value store added in SPARK-20641; the listener will eventually > replace the existing listeners (such as JobProgressListener and > StatusListener), and the UI code will read data directly from the key-value > store instead of being coupled to the listener implementation. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20643) Implement listener for saving application status data in key-value store
[ https://issues.apache.org/jira/browse/SPARK-20643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Imran Rashid reassigned SPARK-20643: Assignee: Marcelo Vanzin > Implement listener for saving application status data in key-value store > > > Key: SPARK-20643 > URL: https://issues.apache.org/jira/browse/SPARK-20643 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Marcelo Vanzin >Assignee: Marcelo Vanzin > Fix For: 2.3.0 > > > See spec in parent issue (SPARK-18085) for more details. > This task tracks adding a new listener that will save application state to > the key-value store added in SPARK-20641; the listener will eventually > replace the existing listeners (such as JobProgressListener and > StatusListener), and the UI code will read data directly from the key-value > store instead of being coupled to the listener implementation. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22365) Spark UI executors empty list with 500 error
[ https://issues.apache.org/jira/browse/SPARK-22365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220631#comment-16220631 ] Sean Owen commented on SPARK-22365: --- Is there no other error following this? a "Caused by" log? if it's really an error from inside jersey, I'm not sure if it's attributable to Spark, but still bears investigating. If it's actually an NPE from Spark code, then we need to see where. Unless anyone can reproduce it, it's hard to say what could be done. > Spark UI executors empty list with 500 error > > > Key: SPARK-22365 > URL: https://issues.apache.org/jira/browse/SPARK-22365 > Project: Spark > Issue Type: Bug > Components: Web UI >Affects Versions: 2.2.0 >Reporter: Jakub Dubovsky > > No data loaded on "execturos" tab in sparkUI with stack trace below. Apart > from exception I have nothing more. But if I can test something to make this > easier to resolve I am happy to help. > {{java.lang.NullPointerException > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388) > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341) > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228) > at > org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:845) > at > org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1689) > at > org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:164) > at > org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1676) > at > org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:581) > at > org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180) > at > org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511) > at > org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112) > at > org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) > at > org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:461) > at > org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213) > at > org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134) > at org.spark_project.jetty.server.Server.handle(Server.java:524) > at > org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:319) > at > org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:253) > at > org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273) > at > org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:95) > at > org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93) > at > org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303) > at > org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148) > at > org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136) > at > org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671) > at > org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589) > at java.lang.Thread.run(Thread.java:748)}} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22365) Spark UI executors empty list with 500 error
[ https://issues.apache.org/jira/browse/SPARK-22365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220630#comment-16220630 ] Jakub Dubovsky commented on SPARK-22365: Is there anything I can do to investigate? I am a BE developer so maybe I am missing some trivial ways to check something... > Spark UI executors empty list with 500 error > > > Key: SPARK-22365 > URL: https://issues.apache.org/jira/browse/SPARK-22365 > Project: Spark > Issue Type: Bug > Components: Web UI >Affects Versions: 2.2.0 >Reporter: Jakub Dubovsky > > No data loaded on "execturos" tab in sparkUI with stack trace below. Apart > from exception I have nothing more. But if I can test something to make this > easier to resolve I am happy to help. > {{java.lang.NullPointerException > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388) > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341) > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228) > at > org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:845) > at > org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1689) > at > org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:164) > at > org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1676) > at > org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:581) > at > org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180) > at > org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511) > at > org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112) > at > org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) > at > org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:461) > at > org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213) > at > org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134) > at org.spark_project.jetty.server.Server.handle(Server.java:524) > at > org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:319) > at > org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:253) > at > org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273) > at > org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:95) > at > org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93) > at > org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303) > at > org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148) > at > org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136) > at > org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671) > at > org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589) > at java.lang.Thread.run(Thread.java:748)}} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22365) Spark UI executors empty list with 500 error
[ https://issues.apache.org/jira/browse/SPARK-22365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220627#comment-16220627 ] Sean Owen commented on SPARK-22365: --- This doesn't actually show the underlying error, just reporting than an NPE happened elsewhere. Hard to do anything without more info, and I haven't seen this. > Spark UI executors empty list with 500 error > > > Key: SPARK-22365 > URL: https://issues.apache.org/jira/browse/SPARK-22365 > Project: Spark > Issue Type: Bug > Components: Web UI >Affects Versions: 2.2.0 >Reporter: Jakub Dubovsky > > No data loaded on "execturos" tab in sparkUI with stack trace below. Apart > from exception I have nothing more. But if I can test something to make this > easier to resolve I am happy to help. > {{java.lang.NullPointerException > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388) > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341) > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228) > at > org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:845) > at > org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1689) > at > org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:164) > at > org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1676) > at > org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:581) > at > org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180) > at > org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511) > at > org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112) > at > org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) > at > org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:461) > at > org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213) > at > org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134) > at org.spark_project.jetty.server.Server.handle(Server.java:524) > at > org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:319) > at > org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:253) > at > org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273) > at > org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:95) > at > org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93) > at > org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303) > at > org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148) > at > org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136) > at > org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671) > at > org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589) > at java.lang.Thread.run(Thread.java:748)}} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22365) Spark UI executors empty list with 500 error
Jakub Dubovsky created SPARK-22365: -- Summary: Spark UI executors empty list with 500 error Key: SPARK-22365 URL: https://issues.apache.org/jira/browse/SPARK-22365 Project: Spark Issue Type: Bug Components: Web UI Affects Versions: 2.2.0 Reporter: Jakub Dubovsky No data loaded on "execturos" tab in sparkUI with stack trace below. Apart from exception I have nothing more. But if I can test something to make this easier to resolve I am happy to help. {{java.lang.NullPointerException at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388) at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341) at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228) at org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:845) at org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1689) at org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:164) at org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1676) at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:581) at org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180) at org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511) at org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112) at org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) at org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:461) at org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213) at org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134) at org.spark_project.jetty.server.Server.handle(Server.java:524) at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:319) at org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:253) at org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273) at org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:95) at org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93) at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303) at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148) at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136) at org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671) at org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589) at java.lang.Thread.run(Thread.java:748)}} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22364) unix_timestamp function sets valid dates to null
[ https://issues.apache.org/jira/browse/SPARK-22364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220483#comment-16220483 ] Sean Owen commented on SPARK-22364: --- Is the problem that this hour doesn't exist in the timezone because it's the hour that is skipped when the clocks move forwards? See https://stackoverflow.com/questions/14201469/java-date-and-daylight-saving which actually talks about the exact same date. > unix_timestamp function sets valid dates to null > > > Key: SPARK-22364 > URL: https://issues.apache.org/jira/browse/SPARK-22364 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 > Environment: Windows 10, United Kingdom >Reporter: Matthew Sinton-Hewitt > > org.apache.spark.sql.functions.unix_timestamp sets some valid dates to null. > The dates happen to be the at the start of Daylight Savings time (UK and > possibly elsewhere). > {code:java} > val spark = SparkSession.builder.getOrCreate() > import spark.implicits._ > spark.sparkContext.parallelize( >Seq("25/03/2012 00:59", "25/03/2012 01:00", "25/03/2012 01:59", > "25/03/2012 02:01")) > .toDF("date") > .select(unix_timestamp($"date", "dd/MM/ HH:mm")) > .show(false) > // results: > // 1332637140, null, null, 1332637260 > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-22364) unix_timestamp function sets valid dates to null
[ https://issues.apache.org/jira/browse/SPARK-22364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthew Sinton-Hewitt updated SPARK-22364: -- Description: org.apache.spark.sql.functions.unix_timestamp sets some valid dates to null. The dates happen to be the at the start of Daylight Savings time (UK and possibly elsewhere). {code:java} val spark = SparkSession.builder.getOrCreate() import spark.implicits._ spark.sparkContext.parallelize( Seq("25/03/2012 00:59", "25/03/2012 01:00", "25/03/2012 01:59", "25/03/2012 02:01")) .toDF("date") .select(unix_timestamp($"date", "dd/MM/ HH:mm")) .show(false) // results: // 1332637140, null, null, 1332637260 {code} was: org.apache.spark.sql.functions.unix_timestamp sets some valid dates to null. The dates happen to be the at the start of Daylight Savings time (UK and possibly elsewhere). {code:java} import spark.implicits._ SparkSession.builder.getOrCreate() .sparkContext.parallelize( Seq("25/03/2012 00:59", "25/03/2012 01:00", "25/03/2012 01:59", "25/03/2012 02:01")) .toDF("date") .select(unix_timestamp($"date", "dd/MM/ HH:mm")) .show(false) // results: // 1332637140, null, null, 1332637260 {code} > unix_timestamp function sets valid dates to null > > > Key: SPARK-22364 > URL: https://issues.apache.org/jira/browse/SPARK-22364 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 > Environment: Windows 10, United Kingdom >Reporter: Matthew Sinton-Hewitt > > org.apache.spark.sql.functions.unix_timestamp sets some valid dates to null. > The dates happen to be the at the start of Daylight Savings time (UK and > possibly elsewhere). > {code:java} > val spark = SparkSession.builder.getOrCreate() > import spark.implicits._ > spark.sparkContext.parallelize( >Seq("25/03/2012 00:59", "25/03/2012 01:00", "25/03/2012 01:59", > "25/03/2012 02:01")) > .toDF("date") > .select(unix_timestamp($"date", "dd/MM/ HH:mm")) > .show(false) > // results: > // 1332637140, null, null, 1332637260 > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-22364) unix_timestamp function sets valid dates to null
[ https://issues.apache.org/jira/browse/SPARK-22364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthew Sinton-Hewitt updated SPARK-22364: -- Description: org.apache.spark.sql.functions.unix_timestamp sets some valid dates to null. The dates happen to be the at the start of Daylight Savings time (UK and possibly elsewhere). {code:java} import spark.implicits._ SparkSession.builder.getOrCreate() .sparkContext.parallelize( Seq("25/03/2012 00:59", "25/03/2012 01:00", "25/03/2012 01:59", "25/03/2012 02:01")) .toDF("date") .select(unix_timestamp($"date", "dd/MM/ HH:mm")) .show(false) // results: // 1332637140, null, null, 1332637260 {code} was: org.apache.spark.sql.functions.unix_timestamp sets some valid dates to null. The dates happen to be the at the start of Daylight Savings time (UK and possibly elsewhere). {code:scala} import spark.implicits._ SparkSession.builder.getOrCreate() .sparkContext.parallelize( Seq("25/03/2012 00:59", "25/03/2012 01:00", "25/03/2012 01:59", "25/03/2012 02:01")) .toDF("date") .select(unix_timestamp($"date", "dd/MM/ HH:mm")) .show(false) // results: // 1332637140, null, null, 1332637260 {code} > unix_timestamp function sets valid dates to null > > > Key: SPARK-22364 > URL: https://issues.apache.org/jira/browse/SPARK-22364 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 > Environment: Windows 10, United Kingdom >Reporter: Matthew Sinton-Hewitt > > org.apache.spark.sql.functions.unix_timestamp sets some valid dates to null. > The dates happen to be the at the start of Daylight Savings time (UK and > possibly elsewhere). > {code:java} > import spark.implicits._ > SparkSession.builder.getOrCreate() > .sparkContext.parallelize( >Seq("25/03/2012 00:59", "25/03/2012 01:00", "25/03/2012 01:59", > "25/03/2012 02:01")) > .toDF("date") > .select(unix_timestamp($"date", "dd/MM/ HH:mm")) > .show(false) > // results: > // 1332637140, null, null, 1332637260 > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)
[ https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220469#comment-16220469 ] Sean Owen commented on SPARK-21657: --- I suspect that something somewhere is doing something that's linear-time that looks like it should be constant-time, like referencing a linked list by index. See https://issues.apache.org/jira/browse/SPARK-22330 for a similar type of thing (though don't think it's the same issue as this one) > Spark has exponential time complexity to explode(array of structs) > -- > > Key: SPARK-21657 > URL: https://issues.apache.org/jira/browse/SPARK-21657 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0, 2.3.0 >Reporter: Ruslan Dautkhanov > Labels: cache, caching, collections, nested_types, performance, > pyspark, sparksql, sql > Attachments: ExponentialTimeGrowth.PNG, > nested-data-generator-and-test.py > > > It can take up to half a day to explode a modest-sized nested collection > (0.5m). > On a recent Xeon processors. > See attached pyspark script that reproduces this problem. > {code} > cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + > table_name).cache() > print sqlc.count() > {code} > This script generate a number of tables, with the same total number of > records across all nested collection (see `scaling` variable in loops). > `scaling` variable scales up how many nested elements in each record, but by > the same factor scales down number of records in the table. So total number > of records stays the same. > Time grows exponentially (notice log-10 vertical axis scale): > !ExponentialTimeGrowth.PNG! > At scaling of 50,000 (see attached pyspark script), it took 7 hours to > explode the nested collections (\!) of 8k records. > After 1000 elements in nested collection, time grows exponentially. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22364) unix_timestamp function sets valid dates to null
Matthew Sinton-Hewitt created SPARK-22364: - Summary: unix_timestamp function sets valid dates to null Key: SPARK-22364 URL: https://issues.apache.org/jira/browse/SPARK-22364 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.2.0 Environment: Windows 10, United Kingdom Reporter: Matthew Sinton-Hewitt org.apache.spark.sql.functions.unix_timestamp sets some valid dates to null. The dates happen to be the at the start of Daylight Savings time (UK and possibly elsewhere). {code:scala} import spark.implicits._ SparkSession.builder.getOrCreate() .sparkContext.parallelize( Seq("25/03/2012 00:59", "25/03/2012 01:00", "25/03/2012 01:59", "25/03/2012 02:01")) .toDF("date") .select(unix_timestamp($"date", "dd/MM/ HH:mm")) .show(false) // results: // 1332637140, null, null, 1332637260 {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)
[ https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220450#comment-16220450 ] Ohad Raviv commented on SPARK-21657: Hi, Wanted to add that we're facing exactly the same issue. 6 hours work for one row that contains 250k array (of struct of 4 strings). Just wanted to state that if we explode only the array, e.g, in your example: cached_df = sqlc.sql('select explode(amft) from ' + table_name) it finishes in about 3 mins. it happens in Spark 2.1 and also 2.2, eventhough SPARK-16998 was resolved. > Spark has exponential time complexity to explode(array of structs) > -- > > Key: SPARK-21657 > URL: https://issues.apache.org/jira/browse/SPARK-21657 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0, 2.3.0 >Reporter: Ruslan Dautkhanov > Labels: cache, caching, collections, nested_types, performance, > pyspark, sparksql, sql > Attachments: ExponentialTimeGrowth.PNG, > nested-data-generator-and-test.py > > > It can take up to half a day to explode a modest-sized nested collection > (0.5m). > On a recent Xeon processors. > See attached pyspark script that reproduces this problem. > {code} > cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + > table_name).cache() > print sqlc.count() > {code} > This script generate a number of tables, with the same total number of > records across all nested collection (see `scaling` variable in loops). > `scaling` variable scales up how many nested elements in each record, but by > the same factor scales down number of records in the table. So total number > of records stays the same. > Time grows exponentially (notice log-10 vertical axis scale): > !ExponentialTimeGrowth.PNG! > At scaling of 50,000 (see attached pyspark script), it took 7 hours to > explode the nested collections (\!) of 8k records. > After 1000 elements in nested collection, time grows exponentially. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22357) SparkContext.binaryFiles ignore minPartitions parameter
[ https://issues.apache.org/jira/browse/SPARK-22357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220431#comment-16220431 ] Saisai Shao commented on SPARK-22357: - Yes, I know this parameter is ignored, but I'm not sure is it intended or not. If it breaks your case I think we should fix it anyway. > SparkContext.binaryFiles ignore minPartitions parameter > --- > > Key: SPARK-22357 > URL: https://issues.apache.org/jira/browse/SPARK-22357 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.2, 2.2.0 >Reporter: Weichen Xu > > this is a bug in binaryFiles - even though we give it the partitions, > binaryFiles ignores it. > This is a bug introduced in spark 2.1 from spark 2.0, in file > PortableDataStream.scala the argument “minPartitions” is no longer used (with > the push to master on 11/7/6): > {code} > /** > Allow minPartitions set by end-user in order to keep compatibility with old > Hadoop API > which is set through setMaxSplitSize > */ > def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: > Int) { > val defaultMaxSplitBytes = > sc.getConf.get(config.FILES_MAX_PARTITION_BYTES) > val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES) > val defaultParallelism = sc.defaultParallelism > val files = listStatus(context).asScala > val totalBytes = files.filterNot(.isDirectory).map(.getLen + > openCostInBytes).sum > val bytesPerCore = totalBytes / defaultParallelism > val maxSplitSize = Math.min(defaultMaxSplitBytes, > Math.max(openCostInBytes, bytesPerCore)) > super.setMaxSplitSize(maxSplitSize) > } > {code} > The code previously, in version 2.0, was: > {code} > def setMinPartitions(context: JobContext, minPartitions: Int) { > val totalLen = > listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum > val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, > 1.0)).toLong > super.setMaxSplitSize(maxSplitSize) > } > {code} > The new code is very smart, but it ignores what the user passes in and uses > the data size, which is kind of a breaking change in some sense > In our specific case this was a problem, because we initially read in just > the files names and only after that the dataframe becomes very large, when > reading in the images themselves – and in this case the new code does not > handle the partitioning very well. > I’m not sure if it can be easily fixed because I don’t understand the full > context of the change in spark (but at the very least the unused parameter > should be removed to avoid confusion). -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22357) SparkContext.binaryFiles ignore minPartitions parameter
[ https://issues.apache.org/jira/browse/SPARK-22357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220417#comment-16220417 ] Weichen Xu commented on SPARK-22357: [~jerryshao] I checked the code, it ignore the `minPartitions` parameter indeed. About these text, [~imatiach] Can you help explain more ? > SparkContext.binaryFiles ignore minPartitions parameter > --- > > Key: SPARK-22357 > URL: https://issues.apache.org/jira/browse/SPARK-22357 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.2, 2.2.0 >Reporter: Weichen Xu > > this is a bug in binaryFiles - even though we give it the partitions, > binaryFiles ignores it. > This is a bug introduced in spark 2.1 from spark 2.0, in file > PortableDataStream.scala the argument “minPartitions” is no longer used (with > the push to master on 11/7/6): > {code} > /** > Allow minPartitions set by end-user in order to keep compatibility with old > Hadoop API > which is set through setMaxSplitSize > */ > def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: > Int) { > val defaultMaxSplitBytes = > sc.getConf.get(config.FILES_MAX_PARTITION_BYTES) > val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES) > val defaultParallelism = sc.defaultParallelism > val files = listStatus(context).asScala > val totalBytes = files.filterNot(.isDirectory).map(.getLen + > openCostInBytes).sum > val bytesPerCore = totalBytes / defaultParallelism > val maxSplitSize = Math.min(defaultMaxSplitBytes, > Math.max(openCostInBytes, bytesPerCore)) > super.setMaxSplitSize(maxSplitSize) > } > {code} > The code previously, in version 2.0, was: > {code} > def setMinPartitions(context: JobContext, minPartitions: Int) { > val totalLen = > listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum > val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, > 1.0)).toLong > super.setMaxSplitSize(maxSplitSize) > } > {code} > The new code is very smart, but it ignores what the user passes in and uses > the data size, which is kind of a breaking change in some sense > In our specific case this was a problem, because we initially read in just > the files names and only after that the dataframe becomes very large, when > reading in the images themselves – and in this case the new code does not > handle the partitioning very well. > I’m not sure if it can be easily fixed because I don’t understand the full > context of the change in spark (but at the very least the unused parameter > should be removed to avoid confusion). -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22357) SparkContext.binaryFiles ignore minPartitions parameter
[ https://issues.apache.org/jira/browse/SPARK-22357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220395#comment-16220395 ] Saisai Shao commented on SPARK-22357: - bq. In our specific case this was a problem, because we initially read in just the files names and only after that the dataframe becomes very large, when reading in the images themselves – and in this case the new code does not handle the partitioning very well. Would you please explain more about this? > SparkContext.binaryFiles ignore minPartitions parameter > --- > > Key: SPARK-22357 > URL: https://issues.apache.org/jira/browse/SPARK-22357 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.2, 2.2.0 >Reporter: Weichen Xu > > this is a bug in binaryFiles - even though we give it the partitions, > binaryFiles ignores it. > This is a bug introduced in spark 2.1 from spark 2.0, in file > PortableDataStream.scala the argument “minPartitions” is no longer used (with > the push to master on 11/7/6): > {code} > /** > Allow minPartitions set by end-user in order to keep compatibility with old > Hadoop API > which is set through setMaxSplitSize > */ > def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: > Int) { > val defaultMaxSplitBytes = > sc.getConf.get(config.FILES_MAX_PARTITION_BYTES) > val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES) > val defaultParallelism = sc.defaultParallelism > val files = listStatus(context).asScala > val totalBytes = files.filterNot(.isDirectory).map(.getLen + > openCostInBytes).sum > val bytesPerCore = totalBytes / defaultParallelism > val maxSplitSize = Math.min(defaultMaxSplitBytes, > Math.max(openCostInBytes, bytesPerCore)) > super.setMaxSplitSize(maxSplitSize) > } > {code} > The code previously, in version 2.0, was: > {code} > def setMinPartitions(context: JobContext, minPartitions: Int) { > val totalLen = > listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum > val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, > 1.0)).toLong > super.setMaxSplitSize(maxSplitSize) > } > {code} > The new code is very smart, but it ignores what the user passes in and uses > the data size, which is kind of a breaking change in some sense > In our specific case this was a problem, because we initially read in just > the files names and only after that the dataframe becomes very large, when > reading in the images themselves – and in this case the new code does not > handle the partitioning very well. > I’m not sure if it can be easily fixed because I don’t understand the full > context of the change in spark (but at the very least the unused parameter > should be removed to avoid confusion). -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22323) Design doc for different types of pandas_udf
[ https://issues.apache.org/jira/browse/SPARK-22323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220373#comment-16220373 ] Hyukjin Kwon commented on SPARK-22323: -- Let me leave a link to the discussion doc - https://docs.google.com/document/d/1KlLaa-xJ3oz28xlEJqXyCAHU3dwFYkFs_ixcUXrJNTc > Design doc for different types of pandas_udf > > > Key: SPARK-22323 > URL: https://issues.apache.org/jira/browse/SPARK-22323 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.2.0 >Reporter: Li Jin > -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22363) Add unit test for Window spilling
Jiang Xingbo created SPARK-22363: Summary: Add unit test for Window spilling Key: SPARK-22363 URL: https://issues.apache.org/jira/browse/SPARK-22363 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.3.0 Reporter: Jiang Xingbo Cover the senarios that WindowExec should spills for at least once. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22362) Add unit test for Window Aggregate Functions
Jiang Xingbo created SPARK-22362: Summary: Add unit test for Window Aggregate Functions Key: SPARK-22362 URL: https://issues.apache.org/jira/browse/SPARK-22362 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.3.0 Reporter: Jiang Xingbo * Declarative * Imperative * UDAF -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22361) Add unit test for Window Frames
Jiang Xingbo created SPARK-22361: Summary: Add unit test for Window Frames Key: SPARK-22361 URL: https://issues.apache.org/jira/browse/SPARK-22361 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.3.0 Reporter: Jiang Xingbo * OffsetWindowFunctionFrame * UnboundedWindowFunctionFrame * SlidingWindowFunctionFrame * UnboundedPrecedingWindowFunctionFrame * UnboundedFollowingWindowFunctionFrame -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22360) Add unit test for Window Specifications
Jiang Xingbo created SPARK-22360: Summary: Add unit test for Window Specifications Key: SPARK-22360 URL: https://issues.apache.org/jira/browse/SPARK-22360 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.3.0 Reporter: Jiang Xingbo * different partition clauses (none, one, multiple) * different order clauses (none, one, multiple, asc/desc, nulls first/last) -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-17902) collect() ignores stringsAsFactors
[ https://issues.apache.org/jira/browse/SPARK-17902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon reassigned SPARK-17902: Assignee: Hyukjin Kwon > collect() ignores stringsAsFactors > -- > > Key: SPARK-17902 > URL: https://issues.apache.org/jira/browse/SPARK-17902 > Project: Spark > Issue Type: Bug > Components: SparkR >Affects Versions: 2.0.1 >Reporter: Hossein Falaki >Assignee: Hyukjin Kwon > Fix For: 2.1.3, 2.2.1, 2.3.0 > > > `collect()` function signature includes an optional flag named > `stringsAsFactors`. It seems it is completely ignored. > {code} > str(collect(createDataFrame(iris), stringsAsFactors = TRUE))) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-17902) collect() ignores stringsAsFactors
[ https://issues.apache.org/jira/browse/SPARK-17902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-17902. -- Resolution: Fixed Fix Version/s: 2.1.3 2.3.0 2.2.1 Issue resolved by pull request 19551 [https://github.com/apache/spark/pull/19551] > collect() ignores stringsAsFactors > -- > > Key: SPARK-17902 > URL: https://issues.apache.org/jira/browse/SPARK-17902 > Project: Spark > Issue Type: Bug > Components: SparkR >Affects Versions: 2.0.1 >Reporter: Hossein Falaki > Fix For: 2.2.1, 2.3.0, 2.1.3 > > > `collect()` function signature includes an optional flag named > `stringsAsFactors`. It seems it is completely ignored. > {code} > str(collect(createDataFrame(iris), stringsAsFactors = TRUE))) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22359) Improve the test coverage of window functions
Jiang Xingbo created SPARK-22359: Summary: Improve the test coverage of window functions Key: SPARK-22359 URL: https://issues.apache.org/jira/browse/SPARK-22359 Project: Spark Issue Type: Test Components: SQL Affects Versions: 2.3.0 Reporter: Jiang Xingbo There are already quite a few integration tests using window functions, but the unit tests coverage for window funtions is not ideal. We'd like to test the following aspects: * Specifications ** different partition clauses (none, one, multiple) ** different order clauses (none, one, multiple, asc/desc, nulls first/last) * Frames and their combinations ** OffsetWindowFunctionFrame ** UnboundedWindowFunctionFrame ** SlidingWindowFunctionFrame ** UnboundedPrecedingWindowFunctionFrame ** UnboundedFollowingWindowFunctionFrame * Aggregate function types ** Declarative ** Imperative ** UDAF * Spilling ** Cover the conditions that WindowExec should spill at least once -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22358) ClassNotFoundException at NettyRpcEnv#deserialize when running Spark on Yarn-Cluster
[ https://issues.apache.org/jira/browse/SPARK-22358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220280#comment-16220280 ] Congxian Qiu commented on SPARK-22358: -- I run the same program many times, got the above error some times, the other time the program run succeed. didn't know how to reproduce the error > ClassNotFoundException at NettyRpcEnv#deserialize when running Spark on > Yarn-Cluster > > > Key: SPARK-22358 > URL: https://issues.apache.org/jira/browse/SPARK-22358 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.2 >Reporter: Congxian Qiu > > When running a Spark Streaming program on Yarn-Cluster mode, received the > following ERROR message, and the program hanged. I found > [Spark-10986](https://issues.apache.org/jira/browse/SPARK-10986) is similiar, > I can't reproduce the error > {noformat} > [2017-10-26 16:53:18,274] ERROR Error while invoking RpcHandler#receive() for > one-way message. (org.apache.spark.network.server.TransportRequestHandler) > java.lang.ClassNotFoundException: org.apache.spark.rpc.RpcAddvess > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:274) > at > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) > at > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109) > at > org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:267) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:319) > at > org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1.apply(NettyRpcEnv.scala:266) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:265) > at > org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:597) > at > org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:586) > at > org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:176) > at > org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:92) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) > at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) > at > io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) > at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) > at >
[jira] [Comment Edited] (SPARK-9686) Spark Thrift server doesn't return correct JDBC metadata
[ https://issues.apache.org/jira/browse/SPARK-9686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16217201#comment-16217201 ] Andriy Kushnir edited comment on SPARK-9686 at 10/26/17 10:18 AM: -- [~rxin], I did a little research for this error. To invoke {{run()}} → {{runInternal()}} on any {{org.apache.hive.service.cli.operation.Operation}} (for example, {{GetSchemasOperation}}) we need {{IMetaStoreClient}}. Currently it's taken from {{HiveSession}} instance: {code:java} public class GetSchemasOperation extends MetadataOperation { @Override public void runInternal() throws HiveSQLException { IMetaStoreClient metastoreClient = getParentSession().getMetaStoreClient(); } } {code} All opened {{HiveSession}} s are handled by {{org.apache.hive.service.cli.session.SessionManager}} instance. {{SessionManager}}, among with others, implements {{org.apache.hive.service.Service}} interface, and all {{Service}} s initialized with same Hive configuration: {code:java} public interface Service { void init(HiveConf conf); } {code} When {{org.apache.spark.sql.hive.thriftserver.HiveThriftServer2}} initializes, all {{org.apache.hive.service.CompositeService}} s receive same {{HiveConf}}: {code:java} private[hive] class HiveThriftServer2(sqlContext: SQLContext) extends HiveServer2 with ReflectedCompositeService { override def init(hiveConf: HiveConf) { initCompositeService(hiveConf) } } object HiveThriftServer2 extends Logging { @DeveloperApi def startWithContext(sqlContext: SQLContext): Unit = { val server = new HiveThriftServer2(sqlContext) val executionHive = HiveUtils.newClientForExecution( sqlContext.sparkContext.conf, sqlContext.sessionState.newHadoopConf()) server.init(executionHive.conf) } } {code} So, {{HiveUtils#newClientForExecution()}} returns implementation of {{IMetaStoreClient}} which *ALWAYS* points to derby metastore (see dosctrings and comments in {{org.apache.spark.sql.hive.HiveUtils#newTemporaryConfiguration()}}) IMHO, to get correct metadata we need to additionally create another {{IMetaStoreClient}} with {{newClientForMetadata()}}, and pass it's {{HiveConf}} to underlying {{Service}} s. was (Author: orhideous): [~rxin], I did a little research for this error. To invoke {{run()}} → {{runInternal()}} on any {{org.apache.hive.service.cli.operation.Operation}} (for example, {{GetSchemasOperation}}) we need {{IMetaStoreClient}}. Currently it's taken from {{HiveSession}} instance: {code:java} public class GetSchemasOperation extends MetadataOperation { @Override public void runInternal() throws HiveSQLException { IMetaStoreClient metastoreClient = getParentSession().getMetaStoreClient(); } } {code} All opened {{HiveSession}} s are handled by {{org.apache.hive.service.cli.session.SessionManager}} instance. {{SessionManager}}, among with others, implements {{org.apache.hive.service.Service}} interface, and all {{Service}}s initialized with same Hive configuration: {code:java} public interface Service { void init(HiveConf conf); } {code} When {{org.apache.spark.sql.hive.thriftserver.HiveThriftServer2}} initializes, all {{org.apache.hive.service.CompositeService}} s receive same {{HiveConf}}: {code:java} private[hive] class HiveThriftServer2(sqlContext: SQLContext) extends HiveServer2 with ReflectedCompositeService { override def init(hiveConf: HiveConf) { initCompositeService(hiveConf) } } object HiveThriftServer2 extends Logging { @DeveloperApi def startWithContext(sqlContext: SQLContext): Unit = { val server = new HiveThriftServer2(sqlContext) val executionHive = HiveUtils.newClientForExecution( sqlContext.sparkContext.conf, sqlContext.sessionState.newHadoopConf()) server.init(executionHive.conf) } } {code} So, {{HiveUtils#newClientForExecution()}} returns implementation of {{IMetaStoreClient}} which *ALWAYS* points to derby metastore (see dosctrings and comments in {{org.apache.spark.sql.hive.HiveUtils#newTemporaryConfiguration()}}) IMHO, to get correct metadata we need to additionally create another {{IMetaStoreClient}} with {{newClientForMetadata()}}, and pass it's {{HiveConf}} to underlying {{Service}} s. > Spark Thrift server doesn't return correct JDBC metadata > - > > Key: SPARK-9686 > URL: https://issues.apache.org/jira/browse/SPARK-9686 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.4.0, 1.4.1, 1.5.0, 1.5.1, 1.5.2 >Reporter: pin_zhang >Priority: Critical > Attachments: SPARK-9686.1.patch.txt > > > 1. Start start-thriftserver.sh > 2. connect with beeline > 3. create table > 4.show tables, the
[jira] [Closed] (SPARK-22358) ClassNotFoundException at NettyRpcEnv#deserialize when running Spark on Yarn-Cluster
[ https://issues.apache.org/jira/browse/SPARK-22358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Congxian Qiu closed SPARK-22358. > ClassNotFoundException at NettyRpcEnv#deserialize when running Spark on > Yarn-Cluster > > > Key: SPARK-22358 > URL: https://issues.apache.org/jira/browse/SPARK-22358 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.2 >Reporter: Congxian Qiu > > When running a Spark Streaming program on Yarn-Cluster mode, received the > following ERROR message, and the program hanged. I found > [Spark-10986](https://issues.apache.org/jira/browse/SPARK-10986) is similiar, > I can't reproduce the error > {noformat} > [2017-10-26 16:53:18,274] ERROR Error while invoking RpcHandler#receive() for > one-way message. (org.apache.spark.network.server.TransportRequestHandler) > java.lang.ClassNotFoundException: org.apache.spark.rpc.RpcAddvess > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:274) > at > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) > at > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109) > at > org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:267) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:319) > at > org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1.apply(NettyRpcEnv.scala:266) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:265) > at > org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:597) > at > org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:586) > at > org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:176) > at > org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:92) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) > at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) > at > io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) > at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) > at
[jira] [Resolved] (SPARK-22358) ClassNotFoundException at NettyRpcEnv#deserialize when running Spark on Yarn-Cluster
[ https://issues.apache.org/jira/browse/SPARK-22358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-22358. --- Resolution: Cannot Reproduce You have some strange corruption in your code or build: RpcAddvess You would need to reproduce it on master or at least a supported branch. You say you can't reproduce this, so, this should not be a JIRA. > ClassNotFoundException at NettyRpcEnv#deserialize when running Spark on > Yarn-Cluster > > > Key: SPARK-22358 > URL: https://issues.apache.org/jira/browse/SPARK-22358 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.2 >Reporter: Congxian Qiu > > When running a Spark Streaming program on Yarn-Cluster mode, received the > following ERROR message, and the program hanged. I found > [Spark-10986](https://issues.apache.org/jira/browse/SPARK-10986) is similiar, > I can't reproduce the error > {noformat} > [2017-10-26 16:53:18,274] ERROR Error while invoking RpcHandler#receive() for > one-way message. (org.apache.spark.network.server.TransportRequestHandler) > java.lang.ClassNotFoundException: org.apache.spark.rpc.RpcAddvess > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:274) > at > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) > at > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109) > at > org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:267) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:319) > at > org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1.apply(NettyRpcEnv.scala:266) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:265) > at > org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:597) > at > org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:586) > at > org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:176) > at > org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:92) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) > at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) > at > io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) > at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) >
[jira] [Created] (SPARK-22358) ClassNotFoundException at NettyRpcEnv#deserialize when running Spark on Yarn-Cluster
Congxian Qiu created SPARK-22358: Summary: ClassNotFoundException at NettyRpcEnv#deserialize when running Spark on Yarn-Cluster Key: SPARK-22358 URL: https://issues.apache.org/jira/browse/SPARK-22358 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.6.2 Reporter: Congxian Qiu When running a Spark Streaming program on Yarn-Cluster mode, received the following ERROR message, and the program hanged. I found [Spark-10986](https://issues.apache.org/jira/browse/SPARK-10986) is similiar, I can't reproduce the error {noformat} [2017-10-26 16:53:18,274] ERROR Error while invoking RpcHandler#receive() for one-way message. (org.apache.spark.network.server.TransportRequestHandler) java.lang.ClassNotFoundException: org.apache.spark.rpc.RpcAddvess at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109) at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:267) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:319) at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1.apply(NettyRpcEnv.scala:266) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:265) at org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:597) at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:586) at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:176) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:92) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at
[jira] [Commented] (SPARK-21725) spark thriftserver insert overwrite table partition select
[ https://issues.apache.org/jira/browse/SPARK-21725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220239#comment-16220239 ] xinzhang commented on SPARK-21725: -- I tried the spark(version-master) at 21/Aug2017, it still appear the problem . I will try it again now. I will replay u the result what I get . Thanks for your replay. [~mgaido] [~srowen] > spark thriftserver insert overwrite table partition select > --- > > Key: SPARK-21725 > URL: https://issues.apache.org/jira/browse/SPARK-21725 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 > Environment: centos 6.7 spark 2.1 jdk8 >Reporter: xinzhang > Labels: spark-sql > > use thriftserver create table with partitions. > session 1: > SET hive.default.fileformat=Parquet;create table tmp_10(count bigint) > partitioned by (pt string) stored as parquet; > --ok > !exit > session 2: > SET hive.default.fileformat=Parquet;create table tmp_11(count bigint) > partitioned by (pt string) stored as parquet; > --ok > !exit > session 3: > --connect the thriftserver > SET hive.default.fileformat=Parquet;insert overwrite table tmp_10 > partition(pt='1') select count(1) count from tmp_11; > --ok > !exit > session 4(do it again): > --connect the thriftserver > SET hive.default.fileformat=Parquet;insert overwrite table tmp_10 > partition(pt='1') select count(1) count from tmp_11; > --error > !exit > - > 17/08/14 18:13:42 ERROR SparkExecuteStatementOperation: Error executing > query, currentState RUNNING, > java.lang.reflect.InvocationTargetException > .. > .. > Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to move > source > hdfs://dc-hadoop54:50001/group/user/user1/meta/hive-temp-table/user1.db/tmp_11/.hive-staging_hive_2017-08-14_18-13-39_035_6303339779053 > 512282-2/-ext-1/part-0 to destination > hdfs://dc-hadoop54:50001/group/user/user1/meta/hive-temp-table/user1.db/tmp_11/pt=1/part-0 > at org.apache.hadoop.hive.ql.metadata.Hive.moveFile(Hive.java:2644) > at org.apache.hadoop.hive.ql.metadata.Hive.copyFiles(Hive.java:2711) > at > org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1403) > at > org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1324) > ... 45 more > Caused by: java.io.IOException: Filesystem closed > > - > the doc about the parquet table desc here > http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files > Hive metastore Parquet table conversion > When reading from and writing to Hive metastore Parquet tables, Spark SQL > will try to use its own Parquet support instead of Hive SerDe for better > performance. This behavior is controlled by the > spark.sql.hive.convertMetastoreParquet configuration, and is turned on by > default. > I am confused the problem appear in the table(partitions) but it is ok with > table(with out partitions) . It means spark do not use its own parquet ? > Maybe someone give any suggest how could I avoid the issue? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21725) spark thriftserver insert overwrite table partition select
[ https://issues.apache.org/jira/browse/SPARK-21725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220228#comment-16220228 ] Marco Gaido commented on SPARK-21725: - please try with the master branch, not with Spark 2.1.2. I used that and I was unable to reproduce the issue. If you manage to reproduce the issue on the current master, then maybe I am doing something wrong trying to reproduce it, despite the steps you posted are pretty precise: thus in that case, I'd ask you to give more information about the configuration and to check the exact steps to reproduce it. Otherwise, the only suggestion I can give is to upgrade to 2.3.0 as soon as it will be available. > spark thriftserver insert overwrite table partition select > --- > > Key: SPARK-21725 > URL: https://issues.apache.org/jira/browse/SPARK-21725 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 > Environment: centos 6.7 spark 2.1 jdk8 >Reporter: xinzhang > Labels: spark-sql > > use thriftserver create table with partitions. > session 1: > SET hive.default.fileformat=Parquet;create table tmp_10(count bigint) > partitioned by (pt string) stored as parquet; > --ok > !exit > session 2: > SET hive.default.fileformat=Parquet;create table tmp_11(count bigint) > partitioned by (pt string) stored as parquet; > --ok > !exit > session 3: > --connect the thriftserver > SET hive.default.fileformat=Parquet;insert overwrite table tmp_10 > partition(pt='1') select count(1) count from tmp_11; > --ok > !exit > session 4(do it again): > --connect the thriftserver > SET hive.default.fileformat=Parquet;insert overwrite table tmp_10 > partition(pt='1') select count(1) count from tmp_11; > --error > !exit > - > 17/08/14 18:13:42 ERROR SparkExecuteStatementOperation: Error executing > query, currentState RUNNING, > java.lang.reflect.InvocationTargetException > .. > .. > Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to move > source > hdfs://dc-hadoop54:50001/group/user/user1/meta/hive-temp-table/user1.db/tmp_11/.hive-staging_hive_2017-08-14_18-13-39_035_6303339779053 > 512282-2/-ext-1/part-0 to destination > hdfs://dc-hadoop54:50001/group/user/user1/meta/hive-temp-table/user1.db/tmp_11/pt=1/part-0 > at org.apache.hadoop.hive.ql.metadata.Hive.moveFile(Hive.java:2644) > at org.apache.hadoop.hive.ql.metadata.Hive.copyFiles(Hive.java:2711) > at > org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1403) > at > org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1324) > ... 45 more > Caused by: java.io.IOException: Filesystem closed > > - > the doc about the parquet table desc here > http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files > Hive metastore Parquet table conversion > When reading from and writing to Hive metastore Parquet tables, Spark SQL > will try to use its own Parquet support instead of Hive SerDe for better > performance. This behavior is controlled by the > spark.sql.hive.convertMetastoreParquet configuration, and is turned on by > default. > I am confused the problem appear in the table(partitions) but it is ok with > table(with out partitions) . It means spark do not use its own parquet ? > Maybe someone give any suggest how could I avoid the issue? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21725) spark thriftserver insert overwrite table partition select
[ https://issues.apache.org/jira/browse/SPARK-21725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220225#comment-16220225 ] Sean Owen commented on SPARK-21725: --- [~zhangxin0112zx] there's no reason to expect 2.1.2 was different. He's asking you to try the current master branch. > spark thriftserver insert overwrite table partition select > --- > > Key: SPARK-21725 > URL: https://issues.apache.org/jira/browse/SPARK-21725 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 > Environment: centos 6.7 spark 2.1 jdk8 >Reporter: xinzhang > Labels: spark-sql > > use thriftserver create table with partitions. > session 1: > SET hive.default.fileformat=Parquet;create table tmp_10(count bigint) > partitioned by (pt string) stored as parquet; > --ok > !exit > session 2: > SET hive.default.fileformat=Parquet;create table tmp_11(count bigint) > partitioned by (pt string) stored as parquet; > --ok > !exit > session 3: > --connect the thriftserver > SET hive.default.fileformat=Parquet;insert overwrite table tmp_10 > partition(pt='1') select count(1) count from tmp_11; > --ok > !exit > session 4(do it again): > --connect the thriftserver > SET hive.default.fileformat=Parquet;insert overwrite table tmp_10 > partition(pt='1') select count(1) count from tmp_11; > --error > !exit > - > 17/08/14 18:13:42 ERROR SparkExecuteStatementOperation: Error executing > query, currentState RUNNING, > java.lang.reflect.InvocationTargetException > .. > .. > Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to move > source > hdfs://dc-hadoop54:50001/group/user/user1/meta/hive-temp-table/user1.db/tmp_11/.hive-staging_hive_2017-08-14_18-13-39_035_6303339779053 > 512282-2/-ext-1/part-0 to destination > hdfs://dc-hadoop54:50001/group/user/user1/meta/hive-temp-table/user1.db/tmp_11/pt=1/part-0 > at org.apache.hadoop.hive.ql.metadata.Hive.moveFile(Hive.java:2644) > at org.apache.hadoop.hive.ql.metadata.Hive.copyFiles(Hive.java:2711) > at > org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1403) > at > org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1324) > ... 45 more > Caused by: java.io.IOException: Filesystem closed > > - > the doc about the parquet table desc here > http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files > Hive metastore Parquet table conversion > When reading from and writing to Hive metastore Parquet tables, Spark SQL > will try to use its own Parquet support instead of Hive SerDe for better > performance. This behavior is controlled by the > spark.sql.hive.convertMetastoreParquet configuration, and is turned on by > default. > I am confused the problem appear in the table(partitions) but it is ok with > table(with out partitions) . It means spark do not use its own parquet ? > Maybe someone give any suggest how could I avoid the issue? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21725) spark thriftserver insert overwrite table partition select
[ https://issues.apache.org/jira/browse/SPARK-21725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220216#comment-16220216 ] xinzhang commented on SPARK-21725: -- I download spark 2.1.2 .The problem still appear . Could u give me any suggests to avoid the problem . [~mgaido] > spark thriftserver insert overwrite table partition select > --- > > Key: SPARK-21725 > URL: https://issues.apache.org/jira/browse/SPARK-21725 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 > Environment: centos 6.7 spark 2.1 jdk8 >Reporter: xinzhang > Labels: spark-sql > > use thriftserver create table with partitions. > session 1: > SET hive.default.fileformat=Parquet;create table tmp_10(count bigint) > partitioned by (pt string) stored as parquet; > --ok > !exit > session 2: > SET hive.default.fileformat=Parquet;create table tmp_11(count bigint) > partitioned by (pt string) stored as parquet; > --ok > !exit > session 3: > --connect the thriftserver > SET hive.default.fileformat=Parquet;insert overwrite table tmp_10 > partition(pt='1') select count(1) count from tmp_11; > --ok > !exit > session 4(do it again): > --connect the thriftserver > SET hive.default.fileformat=Parquet;insert overwrite table tmp_10 > partition(pt='1') select count(1) count from tmp_11; > --error > !exit > - > 17/08/14 18:13:42 ERROR SparkExecuteStatementOperation: Error executing > query, currentState RUNNING, > java.lang.reflect.InvocationTargetException > .. > .. > Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to move > source > hdfs://dc-hadoop54:50001/group/user/user1/meta/hive-temp-table/user1.db/tmp_11/.hive-staging_hive_2017-08-14_18-13-39_035_6303339779053 > 512282-2/-ext-1/part-0 to destination > hdfs://dc-hadoop54:50001/group/user/user1/meta/hive-temp-table/user1.db/tmp_11/pt=1/part-0 > at org.apache.hadoop.hive.ql.metadata.Hive.moveFile(Hive.java:2644) > at org.apache.hadoop.hive.ql.metadata.Hive.copyFiles(Hive.java:2711) > at > org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1403) > at > org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1324) > ... 45 more > Caused by: java.io.IOException: Filesystem closed > > - > the doc about the parquet table desc here > http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files > Hive metastore Parquet table conversion > When reading from and writing to Hive metastore Parquet tables, Spark SQL > will try to use its own Parquet support instead of Hive SerDe for better > performance. This behavior is controlled by the > spark.sql.hive.convertMetastoreParquet configuration, and is turned on by > default. > I am confused the problem appear in the table(partitions) but it is ok with > table(with out partitions) . It means spark do not use its own parquet ? > Maybe someone give any suggest how could I avoid the issue? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-15905) Driver hung while writing to console progress bar
[ https://issues.apache.org/jira/browse/SPARK-15905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220167#comment-16220167 ] Denis Gabaydulin edited comment on SPARK-15905 at 10/26/17 8:48 AM: SPARK 2.1.0 CentOS Linux release 7.3.1611 (Core) jdk180_64_102 Not sure I've got the same issue. But I have at least two threads which are blocked on a logger. A first is main (where I called a unpresist() method). {noformat} Thread 30581: (state = BLOCKED) - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) @bci=12, line=204 (Compiled frame) - org.apache.log4j.Category.forcedLog(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, line=391 (Compiled frame) - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Compiled frame) - org.slf4j.impl.Log4jLoggerAdapter.warn(java.lang.String, java.lang.Throwable) @bci=12, line=479 (Interpreted frame) - org.apache.spark.internal.Logging$class.logWarning(org.apache.spark.internal.Logging, scala.Function0, java.lang.Throwable) @bci=30, line=87 (Interpreted frame) - org.apache.spark.rpc.RpcEndpointRef.logWarning(scala.Function0, java.lang.Throwable) @bci=3, line=30 (Interpreted frame) - org.apache.spark.rpc.RpcEndpointRef.askWithRetry(java.lang.Object, org.apache.spark.rpc.RpcTimeout, scala.reflect.ClassTag) @bci=32, line=111 (Interpreted frame) - org.apache.spark.rpc.RpcEndpointRef.askWithRetry(java.lang.Object, scala.reflect.ClassTag) @bci=7, line=78 (Compiled frame) - org.apache.spark.storage.BlockManagerMaster.removeRdd(int, boolean) @bci=21, line=119 (Compiled frame) - org.apache.spark.SparkContext.unpersistRDD(int, boolean) @bci=12, line=1705 (Compiled frame) - org.apache.spark.rdd.RDD.unpersist(boolean) @bci=21, line=216 (Interpreted frame) - org.apache.spark.sql.execution.CacheManager$$anonfun$uncacheQuery$1.apply$mcZ$sp() @bci=70, line=116 (Interpreted frame) - org.apache.spark.sql.execution.CacheManager$$anonfun$uncacheQuery$1.apply() @bci=1, line=111 (Interpreted frame) - org.apache.spark.sql.execution.CacheManager$$anonfun$uncacheQuery$1.apply() @bci=1, line=111 (Interpreted frame) - org.apache.spark.sql.execution.CacheManager.writeLock(scala.Function0) @bci=13, line=65 (Compiled frame) - org.apache.spark.sql.execution.CacheManager.uncacheQuery(org.apache.spark.sql.Dataset, boolean) @bci=11, line=111 (Interpreted frame) - org.apache.spark.sql.Dataset.unpersist(boolean) @bci=12, line=2526 (Interpreted frame) - org.apache.spark.sql.Dataset.unpersist() @bci=2, line=2536 (Interpreted frame) - ru.ok.dwh.analytics.user.kpi.service.KpiBaseMetricDailyAggregator.complete(boolean) @bci=4, line=68 (Interpreted frame) - ru.ok.dwh.analytics.service.v2.BaseSparkDatasetTransformation.complete() @bci=2, line=70 (Interpreted frame) - ru.ok.dwh.analytics.application.StandardApplication.run(java.lang.String[]) @bci=232, line=109 (Interpreted frame) - ru.ok.dwh.analytics.application.kpi.KpiVideoBaseMetricApp.main(java.lang.String[]) @bci=51, line=53 (Interpreted frame) - sun.reflect.NativeMethodAccessorImpl.invoke0(java.lang.reflect.Method, java.lang.Object, java.lang.Object[]) @bci=0 (Interpreted frame) - sun.reflect.NativeMethodAccessorImpl.invoke(java.lang.Object, java.lang.Object[]) @bci=100, line=62 (Interpreted frame) - sun.reflect.DelegatingMethodAccessorImpl.invoke(java.lang.Object, java.lang.Object[]) @bci=6, line=43 (Interpreted frame) - java.lang.reflect.Method.invoke(java.lang.Object, java.lang.Object[]) @bci=56, line=498 (Interpreted frame) - org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(scala.collection.Seq, scala.collection.Seq, scala.collection.mutable.Map, java.lang.String, boolean) @bci=663, line=738 (Interpreted frame) - org.apache.spark.deploy.SparkSubmit$.doRunMain$1(org.apache.spark.deploy.SparkSubmitArguments, scala.collection.Seq, scala.collection.Seq, scala.collection.mutable.Map, java.lang.String) @bci=18, line=187 (Interpreted frame) - org.apache.spark.deploy.SparkSubmit$.submit(org.apache.spark.deploy.SparkSubmitArguments) @bci=245, line=212 (Interpreted frame) - org.apache.spark.deploy.SparkSubmit$.main(java.lang.String[]) @bci=76, line=126 (Interpreted frame) - org.apache.spark.deploy.SparkSubmit.main(java.lang.String[]) @bci=4 (Interpreted frame) {noformat} And, a couple of spark internal methods {noformat} Thread 30910: (state = BLOCKED) - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) @bci=12, line=204 (Compiled frame) - org.apache.log4j.Category.forcedLog(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, line=391 (Compiled frame) - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority,
[jira] [Commented] (SPARK-22357) SparkContext.binaryFiles ignore minPartitions parameter
[ https://issues.apache.org/jira/browse/SPARK-22357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220169#comment-16220169 ] Weichen Xu commented on SPARK-22357: [~jerryshao] code formatted. This bug is reported from here https://github.com/apache/spark/pull/19439 You can ask [~imatiach] for more context. > SparkContext.binaryFiles ignore minPartitions parameter > --- > > Key: SPARK-22357 > URL: https://issues.apache.org/jira/browse/SPARK-22357 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.2, 2.2.0 >Reporter: Weichen Xu > > this is a bug in binaryFiles - even though we give it the partitions, > binaryFiles ignores it. > This is a bug introduced in spark 2.1 from spark 2.0, in file > PortableDataStream.scala the argument “minPartitions” is no longer used (with > the push to master on 11/7/6): > {code} > /** > Allow minPartitions set by end-user in order to keep compatibility with old > Hadoop API > which is set through setMaxSplitSize > */ > def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: > Int) { > val defaultMaxSplitBytes = > sc.getConf.get(config.FILES_MAX_PARTITION_BYTES) > val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES) > val defaultParallelism = sc.defaultParallelism > val files = listStatus(context).asScala > val totalBytes = files.filterNot(.isDirectory).map(.getLen + > openCostInBytes).sum > val bytesPerCore = totalBytes / defaultParallelism > val maxSplitSize = Math.min(defaultMaxSplitBytes, > Math.max(openCostInBytes, bytesPerCore)) > super.setMaxSplitSize(maxSplitSize) > } > {code} > The code previously, in version 2.0, was: > {code} > def setMinPartitions(context: JobContext, minPartitions: Int) { > val totalLen = > listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum > val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, > 1.0)).toLong > super.setMaxSplitSize(maxSplitSize) > } > {code} > The new code is very smart, but it ignores what the user passes in and uses > the data size, which is kind of a breaking change in some sense > In our specific case this was a problem, because we initially read in just > the files names and only after that the dataframe becomes very large, when > reading in the images themselves – and in this case the new code does not > handle the partitioning very well. > I’m not sure if it can be easily fixed because I don’t understand the full > context of the change in spark (but at the very least the unused parameter > should be removed to avoid confusion). -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-15905) Driver hung while writing to console progress bar
[ https://issues.apache.org/jira/browse/SPARK-15905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220167#comment-16220167 ] Denis Gabaydulin edited comment on SPARK-15905 at 10/26/17 8:42 AM: Not sure I've got the same issue. But I have at least two threads which are blocked on a logger. A first is main (where I called a unpresist() method). {noformat} Thread 30581: (state = BLOCKED) - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) @bci=12, line=204 (Compiled frame) - org.apache.log4j.Category.forcedLog(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, line=391 (Compiled frame) - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Compiled frame) - org.slf4j.impl.Log4jLoggerAdapter.warn(java.lang.String, java.lang.Throwable) @bci=12, line=479 (Interpreted frame) - org.apache.spark.internal.Logging$class.logWarning(org.apache.spark.internal.Logging, scala.Function0, java.lang.Throwable) @bci=30, line=87 (Interpreted frame) - org.apache.spark.rpc.RpcEndpointRef.logWarning(scala.Function0, java.lang.Throwable) @bci=3, line=30 (Interpreted frame) - org.apache.spark.rpc.RpcEndpointRef.askWithRetry(java.lang.Object, org.apache.spark.rpc.RpcTimeout, scala.reflect.ClassTag) @bci=32, line=111 (Interpreted frame) - org.apache.spark.rpc.RpcEndpointRef.askWithRetry(java.lang.Object, scala.reflect.ClassTag) @bci=7, line=78 (Compiled frame) - org.apache.spark.storage.BlockManagerMaster.removeRdd(int, boolean) @bci=21, line=119 (Compiled frame) - org.apache.spark.SparkContext.unpersistRDD(int, boolean) @bci=12, line=1705 (Compiled frame) - org.apache.spark.rdd.RDD.unpersist(boolean) @bci=21, line=216 (Interpreted frame) - org.apache.spark.sql.execution.CacheManager$$anonfun$uncacheQuery$1.apply$mcZ$sp() @bci=70, line=116 (Interpreted frame) - org.apache.spark.sql.execution.CacheManager$$anonfun$uncacheQuery$1.apply() @bci=1, line=111 (Interpreted frame) - org.apache.spark.sql.execution.CacheManager$$anonfun$uncacheQuery$1.apply() @bci=1, line=111 (Interpreted frame) - org.apache.spark.sql.execution.CacheManager.writeLock(scala.Function0) @bci=13, line=65 (Compiled frame) - org.apache.spark.sql.execution.CacheManager.uncacheQuery(org.apache.spark.sql.Dataset, boolean) @bci=11, line=111 (Interpreted frame) - org.apache.spark.sql.Dataset.unpersist(boolean) @bci=12, line=2526 (Interpreted frame) - org.apache.spark.sql.Dataset.unpersist() @bci=2, line=2536 (Interpreted frame) - ru.ok.dwh.analytics.user.kpi.service.KpiBaseMetricDailyAggregator.complete(boolean) @bci=4, line=68 (Interpreted frame) - ru.ok.dwh.analytics.service.v2.BaseSparkDatasetTransformation.complete() @bci=2, line=70 (Interpreted frame) - ru.ok.dwh.analytics.application.StandardApplication.run(java.lang.String[]) @bci=232, line=109 (Interpreted frame) - ru.ok.dwh.analytics.application.kpi.KpiVideoBaseMetricApp.main(java.lang.String[]) @bci=51, line=53 (Interpreted frame) - sun.reflect.NativeMethodAccessorImpl.invoke0(java.lang.reflect.Method, java.lang.Object, java.lang.Object[]) @bci=0 (Interpreted frame) - sun.reflect.NativeMethodAccessorImpl.invoke(java.lang.Object, java.lang.Object[]) @bci=100, line=62 (Interpreted frame) - sun.reflect.DelegatingMethodAccessorImpl.invoke(java.lang.Object, java.lang.Object[]) @bci=6, line=43 (Interpreted frame) - java.lang.reflect.Method.invoke(java.lang.Object, java.lang.Object[]) @bci=56, line=498 (Interpreted frame) - org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(scala.collection.Seq, scala.collection.Seq, scala.collection.mutable.Map, java.lang.String, boolean) @bci=663, line=738 (Interpreted frame) - org.apache.spark.deploy.SparkSubmit$.doRunMain$1(org.apache.spark.deploy.SparkSubmitArguments, scala.collection.Seq, scala.collection.Seq, scala.collection.mutable.Map, java.lang.String) @bci=18, line=187 (Interpreted frame) - org.apache.spark.deploy.SparkSubmit$.submit(org.apache.spark.deploy.SparkSubmitArguments) @bci=245, line=212 (Interpreted frame) - org.apache.spark.deploy.SparkSubmit$.main(java.lang.String[]) @bci=76, line=126 (Interpreted frame) - org.apache.spark.deploy.SparkSubmit.main(java.lang.String[]) @bci=4 (Interpreted frame) {noformat} And, a couple of spark internal methods {noformat} Thread 30910: (state = BLOCKED) - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) @bci=12, line=204 (Compiled frame) - org.apache.log4j.Category.forcedLog(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, line=391 (Compiled frame) - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Compiled frame) -
[jira] [Updated] (SPARK-22357) SparkContext.binaryFiles ignore minPartitions parameter
[ https://issues.apache.org/jira/browse/SPARK-22357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weichen Xu updated SPARK-22357: --- Description: this is a bug in binaryFiles - even though we give it the partitions, binaryFiles ignores it. This is a bug introduced in spark 2.1 from spark 2.0, in file PortableDataStream.scala the argument “minPartitions” is no longer used (with the push to master on 11/7/6): {code} /** Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API which is set through setMaxSplitSize */ def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) { val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES) val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES) val defaultParallelism = sc.defaultParallelism val files = listStatus(context).asScala val totalBytes = files.filterNot(.isDirectory).map(.getLen + openCostInBytes).sum val bytesPerCore = totalBytes / defaultParallelism val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) super.setMaxSplitSize(maxSplitSize) } {code} The code previously, in version 2.0, was: {code} def setMinPartitions(context: JobContext, minPartitions: Int) { val totalLen = listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong super.setMaxSplitSize(maxSplitSize) } {code} The new code is very smart, but it ignores what the user passes in and uses the data size, which is kind of a breaking change in some sense In our specific case this was a problem, because we initially read in just the files names and only after that the dataframe becomes very large, when reading in the images themselves – and in this case the new code does not handle the partitioning very well. I’m not sure if it can be easily fixed because I don’t understand the full context of the change in spark (but at the very least the unused parameter should be removed to avoid confusion). was: this is a bug in binaryFiles - even though we give it the partitions, binaryFiles ignores it. This is a bug introduced in spark 2.1 from spark 2.0, in file PortableDataStream.scala the argument “minPartitions” is no longer used (with the push to master on 11/7/6): /** Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API which is set through setMaxSplitSize */ def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) { val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES) val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES) val defaultParallelism = sc.defaultParallelism val files = listStatus(context).asScala val totalBytes = files.filterNot(.isDirectory).map(.getLen + openCostInBytes).sum val bytesPerCore = totalBytes / defaultParallelism val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) super.setMaxSplitSize(maxSplitSize) } The code previously, in version 2.0, was: def setMinPartitions(context: JobContext, minPartitions: Int) { val totalLen = listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong super.setMaxSplitSize(maxSplitSize) } The new code is very smart, but it ignores what the user passes in and uses the data size, which is kind of a breaking change in some sense In our specific case this was a problem, because we initially read in just the files names and only after that the dataframe becomes very large, when reading in the images themselves – and in this case the new code does not handle the partitioning very well. I’m not sure if it can be easily fixed because I don’t understand the full context of the change in spark (but at the very least the unused parameter should be removed to avoid confusion). > SparkContext.binaryFiles ignore minPartitions parameter > --- > > Key: SPARK-22357 > URL: https://issues.apache.org/jira/browse/SPARK-22357 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.2, 2.2.0 >Reporter: Weichen Xu > > this is a bug in binaryFiles - even though we give it the partitions, > binaryFiles ignores it. > This is a bug introduced in spark 2.1 from spark 2.0, in file > PortableDataStream.scala the argument “minPartitions” is no longer used (with > the push to master on 11/7/6): > {code} > /** > Allow minPartitions set by end-user in order to keep compatibility with old > Hadoop API > which is set through setMaxSplitSize > */ > def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: > Int) { > val defaultMaxSplitBytes
[jira] [Commented] (SPARK-15905) Driver hung while writing to console progress bar
[ https://issues.apache.org/jira/browse/SPARK-15905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220167#comment-16220167 ] Denis Gabaydulin commented on SPARK-15905: -- Not sure I've got the same issue. But I have at least two threads which are blocked on a logger. A first is main (where I called a unpresist() method). {noformat} Thread 30581: (state = BLOCKED) - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) @bci=12, line=204 (Compiled frame) - org.apache.log4j.Category.forcedLog(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, line=391 (Compiled frame) - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Compiled frame) - org.slf4j.impl.Log4jLoggerAdapter.warn(java.lang.String, java.lang.Throwable) @bci=12, line=479 (Interpreted frame) - org.apache.spark.internal.Logging$class.logWarning(org.apache.spark.internal.Logging, scala.Function0, java.lang.Throwable) @bci=30, line=87 (Interpreted frame) - org.apache.spark.rpc.RpcEndpointRef.logWarning(scala.Function0, java.lang.Throwable) @bci=3, line=30 (Interpreted frame) - org.apache.spark.rpc.RpcEndpointRef.askWithRetry(java.lang.Object, org.apache.spark.rpc.RpcTimeout, scala.reflect.ClassTag) @bci=32, line=111 (Interpreted frame) - org.apache.spark.rpc.RpcEndpointRef.askWithRetry(java.lang.Object, scala.reflect.ClassTag) @bci=7, line=78 (Compiled frame) - org.apache.spark.storage.BlockManagerMaster.removeRdd(int, boolean) @bci=21, line=119 (Compiled frame) - org.apache.spark.SparkContext.unpersistRDD(int, boolean) @bci=12, line=1705 (Compiled frame) - org.apache.spark.rdd.RDD.unpersist(boolean) @bci=21, line=216 (Interpreted frame) - org.apache.spark.sql.execution.CacheManager$$anonfun$uncacheQuery$1.apply$mcZ$sp() @bci=70, line=116 (Interpreted frame) - org.apache.spark.sql.execution.CacheManager$$anonfun$uncacheQuery$1.apply() @bci=1, line=111 (Interpreted frame) - org.apache.spark.sql.execution.CacheManager$$anonfun$uncacheQuery$1.apply() @bci=1, line=111 (Interpreted frame) - org.apache.spark.sql.execution.CacheManager.writeLock(scala.Function0) @bci=13, line=65 (Compiled frame) - org.apache.spark.sql.execution.CacheManager.uncacheQuery(org.apache.spark.sql.Dataset, boolean) @bci=11, line=111 (Interpreted frame) - org.apache.spark.sql.Dataset.unpersist(boolean) @bci=12, line=2526 (Interpreted frame) - org.apache.spark.sql.Dataset.unpersist() @bci=2, line=2536 (Interpreted frame) - ru.ok.dwh.analytics.user.kpi.service.KpiBaseMetricDailyAggregator.complete(boolean) @bci=4, line=68 (Interpreted frame) - ru.ok.dwh.analytics.service.v2.BaseSparkDatasetTransformation.complete() @bci=2, line=70 (Interpreted frame) - ru.ok.dwh.analytics.application.StandardApplication.run(java.lang.String[]) @bci=232, line=109 (Interpreted frame) - ru.ok.dwh.analytics.application.kpi.KpiVideoBaseMetricApp.main(java.lang.String[]) @bci=51, line=53 (Interpreted frame) - sun.reflect.NativeMethodAccessorImpl.invoke0(java.lang.reflect.Method, java.lang.Object, java.lang.Object[]) @bci=0 (Interpreted frame) - sun.reflect.NativeMethodAccessorImpl.invoke(java.lang.Object, java.lang.Object[]) @bci=100, line=62 (Interpreted frame) - sun.reflect.DelegatingMethodAccessorImpl.invoke(java.lang.Object, java.lang.Object[]) @bci=6, line=43 (Interpreted frame) - java.lang.reflect.Method.invoke(java.lang.Object, java.lang.Object[]) @bci=56, line=498 (Interpreted frame) - org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(scala.collection.Seq, scala.collection.Seq, scala.collection.mutable.Map, java.lang.String, boolean) @bci=663, line=738 (Interpreted frame) - org.apache.spark.deploy.SparkSubmit$.doRunMain$1(org.apache.spark.deploy.SparkSubmitArguments, scala.collection.Seq, scala.collection.Seq, scala.collection.mutable.Map, java.lang.String) @bci=18, line=187 (Interpreted frame) - org.apache.spark.deploy.SparkSubmit$.submit(org.apache.spark.deploy.SparkSubmitArguments) @bci=245, line=212 (Interpreted frame) - org.apache.spark.deploy.SparkSubmit$.main(java.lang.String[]) @bci=76, line=126 (Interpreted frame) - org.apache.spark.deploy.SparkSubmit.main(java.lang.String[]) @bci=4 (Interpreted frame) {noformat} And, a couple of spark internal methods {noforamt} Thread 30910: (state = BLOCKED) - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) @bci=12, line=204 (Compiled frame) - org.apache.log4j.Category.forcedLog(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, line=391 (Compiled frame) - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Compiled frame) -
[jira] [Commented] (SPARK-22357) SparkContext.binaryFiles ignore minPartitions parameter
[ https://issues.apache.org/jira/browse/SPARK-22357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220160#comment-16220160 ] Saisai Shao commented on SPARK-22357: - [~WeichenXu123] would you please format the code in JIRA description to make it easy to read :). > SparkContext.binaryFiles ignore minPartitions parameter > --- > > Key: SPARK-22357 > URL: https://issues.apache.org/jira/browse/SPARK-22357 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.2, 2.2.0 >Reporter: Weichen Xu > > this is a bug in binaryFiles - even though we give it the partitions, > binaryFiles ignores it. > This is a bug introduced in spark 2.1 from spark 2.0, in file > PortableDataStream.scala the argument “minPartitions” is no longer used (with > the push to master on 11/7/6): > /** > Allow minPartitions set by end-user in order to keep compatibility with old > Hadoop API > which is set through setMaxSplitSize > */ > def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: > Int) { > val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES) > val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES) > val defaultParallelism = sc.defaultParallelism > val files = listStatus(context).asScala > val totalBytes = files.filterNot(.isDirectory).map(.getLen + > openCostInBytes).sum > val bytesPerCore = totalBytes / defaultParallelism > val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, > bytesPerCore)) > super.setMaxSplitSize(maxSplitSize) > } > The code previously, in version 2.0, was: > def setMinPartitions(context: JobContext, minPartitions: Int) { > val totalLen = > listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum > val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong > super.setMaxSplitSize(maxSplitSize) > } > The new code is very smart, but it ignores what the user passes in and uses > the data size, which is kind of a breaking change in some sense > In our specific case this was a problem, because we initially read in just > the files names and only after that the dataframe becomes very large, when > reading in the images themselves – and in this case the new code does not > handle the partitioning very well. > I’m not sure if it can be easily fixed because I don’t understand the full > context of the change in spark (but at the very least the unused parameter > should be removed to avoid confusion). -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22357) SparkContext.binaryFiles ignore minPartitions parameter
[ https://issues.apache.org/jira/browse/SPARK-22357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220152#comment-16220152 ] Weichen Xu commented on SPARK-22357: cc [~jerryshao] > SparkContext.binaryFiles ignore minPartitions parameter > --- > > Key: SPARK-22357 > URL: https://issues.apache.org/jira/browse/SPARK-22357 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.2, 2.2.0 >Reporter: Weichen Xu > > this is a bug in binaryFiles - even though we give it the partitions, > binaryFiles ignores it. > This is a bug introduced in spark 2.1 from spark 2.0, in file > PortableDataStream.scala the argument “minPartitions” is no longer used (with > the push to master on 11/7/6): > /** > Allow minPartitions set by end-user in order to keep compatibility with old > Hadoop API > which is set through setMaxSplitSize > */ > def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: > Int) { > val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES) > val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES) > val defaultParallelism = sc.defaultParallelism > val files = listStatus(context).asScala > val totalBytes = files.filterNot(.isDirectory).map(.getLen + > openCostInBytes).sum > val bytesPerCore = totalBytes / defaultParallelism > val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, > bytesPerCore)) > super.setMaxSplitSize(maxSplitSize) > } > The code previously, in version 2.0, was: > def setMinPartitions(context: JobContext, minPartitions: Int) { > val totalLen = > listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum > val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong > super.setMaxSplitSize(maxSplitSize) > } > The new code is very smart, but it ignores what the user passes in and uses > the data size, which is kind of a breaking change in some sense > In our specific case this was a problem, because we initially read in just > the files names and only after that the dataframe becomes very large, when > reading in the images themselves – and in this case the new code does not > handle the partitioning very well. > I’m not sure if it can be easily fixed because I don’t understand the full > context of the change in spark (but at the very least the unused parameter > should be removed to avoid confusion). -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22357) SparkContext.binaryFiles ignore minPartitions parameter
[ https://issues.apache.org/jira/browse/SPARK-22357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220147#comment-16220147 ] Weichen Xu commented on SPARK-22357: cc [~imatiach] [~liancheng] > SparkContext.binaryFiles ignore minPartitions parameter > --- > > Key: SPARK-22357 > URL: https://issues.apache.org/jira/browse/SPARK-22357 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.2, 2.2.0 >Reporter: Weichen Xu > > this is a bug in binaryFiles - even though we give it the partitions, > binaryFiles ignores it. > This is a bug introduced in spark 2.1 from spark 2.0, in file > PortableDataStream.scala the argument “minPartitions” is no longer used (with > the push to master on 11/7/6): > /** > Allow minPartitions set by end-user in order to keep compatibility with old > Hadoop API > which is set through setMaxSplitSize > */ > def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: > Int) { > val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES) > val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES) > val defaultParallelism = sc.defaultParallelism > val files = listStatus(context).asScala > val totalBytes = files.filterNot(.isDirectory).map(.getLen + > openCostInBytes).sum > val bytesPerCore = totalBytes / defaultParallelism > val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, > bytesPerCore)) > super.setMaxSplitSize(maxSplitSize) > } > The code previously, in version 2.0, was: > def setMinPartitions(context: JobContext, minPartitions: Int) { > val totalLen = > listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum > val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong > super.setMaxSplitSize(maxSplitSize) > } > The new code is very smart, but it ignores what the user passes in and uses > the data size, which is kind of a breaking change in some sense > In our specific case this was a problem, because we initially read in just > the files names and only after that the dataframe becomes very large, when > reading in the images themselves – and in this case the new code does not > handle the partitioning very well. > I’m not sure if it can be easily fixed because I don’t understand the full > context of the change in spark (but at the very least the unused parameter > should be removed to avoid confusion). -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22357) SparkContext.binaryFiles ignore minPartitions parameter
Weichen Xu created SPARK-22357: -- Summary: SparkContext.binaryFiles ignore minPartitions parameter Key: SPARK-22357 URL: https://issues.apache.org/jira/browse/SPARK-22357 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.2.0, 2.1.2 Reporter: Weichen Xu this is a bug in binaryFiles - even though we give it the partitions, binaryFiles ignores it. This is a bug introduced in spark 2.1 from spark 2.0, in file PortableDataStream.scala the argument “minPartitions” is no longer used (with the push to master on 11/7/6): /** Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API which is set through setMaxSplitSize */ def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) { val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES) val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES) val defaultParallelism = sc.defaultParallelism val files = listStatus(context).asScala val totalBytes = files.filterNot(.isDirectory).map(.getLen + openCostInBytes).sum val bytesPerCore = totalBytes / defaultParallelism val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) super.setMaxSplitSize(maxSplitSize) } The code previously, in version 2.0, was: def setMinPartitions(context: JobContext, minPartitions: Int) { val totalLen = listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong super.setMaxSplitSize(maxSplitSize) } The new code is very smart, but it ignores what the user passes in and uses the data size, which is kind of a breaking change in some sense In our specific case this was a problem, because we initially read in just the files names and only after that the dataframe becomes very large, when reading in the images themselves – and in this case the new code does not handle the partitioning very well. I’m not sure if it can be easily fixed because I don’t understand the full context of the change in spark (but at the very least the unused parameter should be removed to avoid confusion). -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-21840) Allow multiple SparkSubmit invocations in same JVM without polluting system properties
[ https://issues.apache.org/jira/browse/SPARK-21840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Saisai Shao reassigned SPARK-21840: --- Assignee: Marcelo Vanzin > Allow multiple SparkSubmit invocations in same JVM without polluting system > properties > -- > > Key: SPARK-21840 > URL: https://issues.apache.org/jira/browse/SPARK-21840 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Marcelo Vanzin >Assignee: Marcelo Vanzin >Priority: Minor > Fix For: 2.3.0 > > > Filing this as a sub-task of SPARK-11035; this feature was discussed as part > of the PR currently attached to that bug. > Basically, to allow the launcher library to run applications in-process, the > easiest way is for it to run the {{SparkSubmit}} class. But that class > currently propagates configuration to applications by modifying system > properties. > That means that when launching multiple applications in that manner in the > same JVM, the configuration of the first application may leak into the second > application (or to any other invocation of `new SparkConf()` for that matter). > This feature is about breaking out the fix for this particular issue from the > PR linked to SPARK-11035. With the changes in SPARK-21728, the implementation > can even be further enhanced by providing an actual {{SparkConf}} instance to > the application, instead of opaque maps. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-21840) Allow multiple SparkSubmit invocations in same JVM without polluting system properties
[ https://issues.apache.org/jira/browse/SPARK-21840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Saisai Shao resolved SPARK-21840. - Resolution: Fixed Fix Version/s: 2.3.0 Issue resolved by pull request 19519 [https://github.com/apache/spark/pull/19519] > Allow multiple SparkSubmit invocations in same JVM without polluting system > properties > -- > > Key: SPARK-21840 > URL: https://issues.apache.org/jira/browse/SPARK-21840 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Marcelo Vanzin >Priority: Minor > Fix For: 2.3.0 > > > Filing this as a sub-task of SPARK-11035; this feature was discussed as part > of the PR currently attached to that bug. > Basically, to allow the launcher library to run applications in-process, the > easiest way is for it to run the {{SparkSubmit}} class. But that class > currently propagates configuration to applications by modifying system > properties. > That means that when launching multiple applications in that manner in the > same JVM, the configuration of the first application may leak into the second > application (or to any other invocation of `new SparkConf()` for that matter). > This feature is about breaking out the fix for this particular issue from the > PR linked to SPARK-11035. With the changes in SPARK-21728, the implementation > can even be further enhanced by providing an actual {{SparkConf}} instance to > the application, instead of opaque maps. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-22308) Support unit tests of spark code using ScalaTest using suites other than FunSuite
[ https://issues.apache.org/jira/browse/SPARK-22308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li resolved SPARK-22308. - Resolution: Fixed Assignee: Nathan Kronenfeld Fix Version/s: 2.3.0 > Support unit tests of spark code using ScalaTest using suites other than > FunSuite > - > > Key: SPARK-22308 > URL: https://issues.apache.org/jira/browse/SPARK-22308 > Project: Spark > Issue Type: Improvement > Components: Documentation, Spark Core, SQL, Tests >Affects Versions: 2.2.0 >Reporter: Nathan Kronenfeld >Assignee: Nathan Kronenfeld >Priority: Minor > Labels: scalatest, test-suite, test_issue > Fix For: 2.3.0 > > > External codebases that have spark code can test it using SharedSparkContext, > no matter how they write their scalatests - basing on FunSuite, FunSpec, > FlatSpec, or WordSpec. > SharedSQLContext only supports FunSuite. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22344) Prevent R CMD check from using /tmp
[ https://issues.apache.org/jira/browse/SPARK-22344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220083#comment-16220083 ] Felix Cheung commented on SPARK-22344: -- why is hive there when enableHiveSupport should be off? what do we do with .cache/? https://stackoverflow.com/questions/76327/how-can-i-prevent-java-from-creating-hsperfdata-files > Prevent R CMD check from using /tmp > --- > > Key: SPARK-22344 > URL: https://issues.apache.org/jira/browse/SPARK-22344 > Project: Spark > Issue Type: Bug > Components: SparkR >Affects Versions: 1.6.3, 2.1.2, 2.2.0, 2.3.0 >Reporter: Shivaram Venkataraman > > When R CMD check is run on the SparkR package it leaves behind files in /tmp > which is a violation of CRAN policy. We should instead write to Rtmpdir. > Notes from CRAN are below > {code} > Checking this leaves behind dirs >hive/$USER >$USER > and files named like >b4f6459b-0624-4100-8358-7aa7afbda757_resources > in /tmp, in violation of the CRAN Policy. > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22344) Prevent R CMD check from using /tmp
[ https://issues.apache.org/jira/browse/SPARK-22344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220080#comment-16220080 ] Felix Cheung edited comment on SPARK-22344 at 10/26/17 7:25 AM: this is what I see on a clean machine tracking access/create/modify to file system. I run this command: R CMD check --as-cran SparkR_2.1.2.tar.gz And matching the report these are accessed: /tmp/hive/ /tmp/hive/ubuntu/8201eb2c-8065-458c-b564-1e61b3dc5b7d/_tmp_space.db/ /tmp/ubuntu/8201eb2c-8065-458c-b564-1e61b3dc5b7d/ But these are also there: ~/.cache /tmp/hsperfdata_ubuntu/ /tmp/1993ae7a-f553-4de5-9f74-6c8393e3cd5a_resources/ /tmp/8201eb2c-8065-458c-b564-1e61b3dc5b7d_resources/ And this is created and deleted: /tmp/blockmgr-b27976f3-b66a-44e1-94c2-7360525af321/ was (Author: felixcheung): this is what I see on a clean machine tracking access/create/modify to file system. I run this command: R CMD check --as-cran SparkR_2.1.2.tar.gz And matching the report these are accessed: /tmp/hive/ /tmp/hive/ubuntu/8201eb2c-8065-458c-b564-1e61b3dc5b7d/_tmp_space.db/ /tmp/ubuntu/8201eb2c-8065-458c-b564-1e61b3dc5b7d/ But these are also there: ~/.cache /tmp/blockmgr-b27976f3-b66a-44e1-94c2-7360525af321/ /tmp/hsperfdata_ubuntu/ /tmp/1993ae7a-f553-4de5-9f74-6c8393e3cd5a_resources/ /tmp/8201eb2c-8065-458c-b564-1e61b3dc5b7d_resources/ > Prevent R CMD check from using /tmp > --- > > Key: SPARK-22344 > URL: https://issues.apache.org/jira/browse/SPARK-22344 > Project: Spark > Issue Type: Bug > Components: SparkR >Affects Versions: 1.6.3, 2.1.2, 2.2.0, 2.3.0 >Reporter: Shivaram Venkataraman > > When R CMD check is run on the SparkR package it leaves behind files in /tmp > which is a violation of CRAN policy. We should instead write to Rtmpdir. > Notes from CRAN are below > {code} > Checking this leaves behind dirs >hive/$USER >$USER > and files named like >b4f6459b-0624-4100-8358-7aa7afbda757_resources > in /tmp, in violation of the CRAN Policy. > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22344) Prevent R CMD check from using /tmp
[ https://issues.apache.org/jira/browse/SPARK-22344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220080#comment-16220080 ] Felix Cheung commented on SPARK-22344: -- this is what I see on a clean machine tracking access/create/modify to file system. I run this command: R CMD check --as-cran SparkR_2.1.2.tar.gz And matching the report these are accessed: /tmp/hive/ /tmp/hive/ubuntu/8201eb2c-8065-458c-b564-1e61b3dc5b7d/_tmp_space.db/ /tmp/ubuntu/8201eb2c-8065-458c-b564-1e61b3dc5b7d/ But these are also there: ~/.cache /tmp/blockmgr-b27976f3-b66a-44e1-94c2-7360525af321/ /tmp/hsperfdata_ubuntu/ /tmp/1993ae7a-f553-4de5-9f74-6c8393e3cd5a_resources/ /tmp/8201eb2c-8065-458c-b564-1e61b3dc5b7d_resources/ > Prevent R CMD check from using /tmp > --- > > Key: SPARK-22344 > URL: https://issues.apache.org/jira/browse/SPARK-22344 > Project: Spark > Issue Type: Bug > Components: SparkR >Affects Versions: 1.6.3, 2.1.2, 2.2.0, 2.3.0 >Reporter: Shivaram Venkataraman > > When R CMD check is run on the SparkR package it leaves behind files in /tmp > which is a violation of CRAN policy. We should instead write to Rtmpdir. > Notes from CRAN are below > {code} > Checking this leaves behind dirs >hive/$USER >$USER > and files named like >b4f6459b-0624-4100-8358-7aa7afbda757_resources > in /tmp, in violation of the CRAN Policy. > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org