[jira] [Commented] (SPARK-23589) Add interpreted execution for ExternalMapToCatalyst expression
[ https://issues.apache.org/jira/browse/SPARK-23589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392536#comment-16392536 ] Takeshi Yamamuro commented on SPARK-23589: -- I'll make a pr after I finish other sub-tickets: https://github.com/apache/spark/compare/master...maropu:SPARK-23589 > Add interpreted execution for ExternalMapToCatalyst expression > -- > > Key: SPARK-23589 > URL: https://issues.apache.org/jira/browse/SPARK-23589 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Herman van Hovell >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-22246) UnsafeRow, UnsafeArrayData, and UnsafeMapData use MemoryBlock
[ https://issues.apache.org/jira/browse/SPARK-22246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kazuaki Ishizaki updated SPARK-22246: - Issue Type: Sub-task (was: Improvement) Parent: SPARK-10399 > UnsafeRow, UnsafeArrayData, and UnsafeMapData use MemoryBlock > - > > Key: SPARK-22246 > URL: https://issues.apache.org/jira/browse/SPARK-22246 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Kazuaki Ishizaki >Priority: Major > > To use {{MemoryBlock}} can improve flexibility of choosing memory type and > runtime performance for memory accesses with {{Unsafe}}. > This JIRA entry tries to use {{MemoryBlock}} in {{UnsafeRow}}, > {{UnsafeArrayData}}, and {{UnsafeMapData}} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23632) sparkR.session() error with spark packages - JVM is not ready after 10 seconds
[ https://issues.apache.org/jira/browse/SPARK-23632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392507#comment-16392507 ] Felix Cheung commented on SPARK-23632: -- To clarify, are you running into problem because the package download is taking longer than the fixed 10 sec? > sparkR.session() error with spark packages - JVM is not ready after 10 seconds > -- > > Key: SPARK-23632 > URL: https://issues.apache.org/jira/browse/SPARK-23632 > Project: Spark > Issue Type: Bug > Components: SparkR >Affects Versions: 2.2.0, 2.2.1, 2.3.0 >Reporter: Jaehyeon Kim >Priority: Minor > > Hi > When I execute _sparkR.session()_ with _org.apache.hadoop:hadoop-aws:2.8.2_ > as following, > {code:java} > library(SparkR, lib.loc=file.path(Sys.getenv('SPARK_HOME'),'R', 'lib')) > ext_opts <- '-Dhttp.proxyHost=10.74.1.25 -Dhttp.proxyPort=8080 > -Dhttps.proxyHost=10.74.1.25 -Dhttps.proxyPort=8080' > sparkR.session(master = "spark://master:7077", >appName = 'ml demo', >sparkConfig = list(spark.driver.memory = '2g'), >sparkPackages = 'org.apache.hadoop:hadoop-aws:2.8.2', >spark.driver.extraJavaOptions = ext_opts) > {code} > I see *JVM is not ready after 10 seconds* error. Below shows some of the log > messages. > {code:java} > Ivy Default Cache set to: /home/rstudio/.ivy2/cache > The jars for the packages stored in: /home/rstudio/.ivy2/jars > :: loading settings :: url = > jar:file:/usr/local/spark-2.2.1/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml > org.apache.hadoop#hadoop-aws added as a dependency > :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 > confs: [default] > found org.apache.hadoop#hadoop-aws;2.8.2 in central > ... > ... > found javax.servlet.jsp#jsp-api;2.1 in central > Error in sparkR.sparkContext(master, appName, sparkHome, sparkConfigMap, : > JVM is not ready after 10 seconds > ... > ... > found joda-time#joda-time;2.9.4 in central > downloading > https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.2/hadoop-aws-2.8.2.jar > ... > ... > ... > xmlenc#xmlenc;0.52 from central in [default] > - > | |modules|| artifacts | > | conf | number| search|dwnlded|evicted|| number|dwnlded| > - > | default | 76 | 76 | 76 | 0 || 76 | 76 | > - > :: retrieving :: org.apache.spark#spark-submit-parent > confs: [default] > 76 artifacts copied, 0 already retrieved (27334kB/56ms) > {code} > It's fine if I re-execute it after the package and its dependencies are > downloaded. > I consider it's because of this part - > https://github.com/apache/spark/blob/master/R/pkg/R/sparkR.R#L181 > {code:java} > if (!file.exists(path)) { > stop("JVM is not ready after 10 seconds") > } > {code} > Just wonder if it may be possible to update so that a user can determine how > much to wait? > Thanks. > Regards > Jaehyeon -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23638) Spark on k8s: spark.kubernetes.initContainer.image has no effect
maheshvra created SPARK-23638: - Summary: Spark on k8s: spark.kubernetes.initContainer.image has no effect Key: SPARK-23638 URL: https://issues.apache.org/jira/browse/SPARK-23638 Project: Spark Issue Type: Bug Components: Kubernetes Affects Versions: 2.3.0 Environment: K8 server: Ubuntu 16.04 Submission client: macOS Sierra 10.12.x Client Version: version.Info\{Major:"1", Minor:"9", GitVersion:"v1.9.3", GitCommit:"d2835416544f298c919e2ead3be3d0864b52323b", GitTreeState:"clean", BuildDate:"2018-02-07T12:22:21Z", GoVersion:"go1.9.2", Compiler:"gc", Platform:"darwin/amd64"} Server Version: version.Info\{Major:"1", Minor:"8", GitVersion:"v1.8.3", GitCommit:"f0efb3cb883751c5ffdbe6d515f3cb4fbe7b7acd", GitTreeState:"clean", BuildDate:"2017-11-08T18:27:48Z", GoVersion:"go1.8.3", Compiler:"gc", Platform:"linux/amd64"} Reporter: maheshvra Hi all - I am trying to use initContainer to download remote dependencies. To begin with, I ran a test with initContainer which basically "echo hello world". However, when i triggered the pod deployment via spark-submit, I did not see any trace of initContainer execution in my kubernetes cluster. {code:java} SPARK_DRIVER_MEMORY: 1g SPARK_DRIVER_CLASS: com.bigdata.App SPARK_DRIVER_ARGS: -c /opt/spark/work-dir/app/main/environments/int -w ./../../workflows/workflow_main.json -e prod -n features -v off SPARK_DRIVER_BIND_ADDRESS: SPARK_JAVA_OPT_0: -Dspark.submit.deployMode=cluster SPARK_JAVA_OPT_1: -Dspark.driver.blockManager.port=7079 SPARK_JAVA_OPT_2: -Dspark.app.name=fg-am00-raw12 SPARK_JAVA_OPT_3: -Dspark.kubernetes.container.image=docker.com/cmapp/fg-am00-raw:1.0.0 SPARK_JAVA_OPT_4: -Dspark.app.id=spark-4fa9a5ce1b1d401fa9c1e413ff030d44 SPARK_JAVA_OPT_5: -Dspark.jars=/opt/spark/jars/aws-java-sdk-1.7.4.jar,/opt/spark/jars/hadoop-aws-2.7.3.jar,/opt/spark/jars/guava-14.0.1.jar,/opt/spark/jars/SparkApp.jar,/opt/spark/jars/datacleanup-component-1.0-SNAPSHOT.jar SPARK_JAVA_OPT_6: -Dspark.driver.port=7078 SPARK_JAVA_OPT_7: -Dspark.kubernetes.initContainer.image=docker.com/cmapp/custombusybox:1.0.0 SPARK_JAVA_OPT_8: -Dspark.kubernetes.executor.podNamePrefix=fg-am00-raw12-b1c8112b8536304ab0fc64fcc41e0615 SPARK_JAVA_OPT_9: -Dspark.kubernetes.driver.pod.name=fg-am00-raw12-b1c8112b8536304ab0fc64fcc41e0615-driver SPARK_JAVA_OPT_10: -Dspark.driver.host=fg-am00-raw12-b1c8112b8536304ab0fc64fcc41e0615-driver-svc.experimental.svc SPARK_JAVA_OPT_11: -Dspark.executor.instances=5 SPARK_JAVA_OPT_12: -Dspark.hadoop.fs.s3a.server-side-encryption-algorithm=AES256 SPARK_JAVA_OPT_13: -Dspark.kubernetes.namespace=experimental SPARK_JAVA_OPT_14: -Dspark.kubernetes.authenticate.driver.serviceAccountName=experimental-service-account SPARK_JAVA_OPT_15: -Dspark.master=k8s://https://bigdata {code} Further, I did not see spec.initContainers section in the generated pod. Please see the details below {code:java} { "kind": "Pod", "apiVersion": "v1", "metadata": { "name": "fg-am00-raw12-b1c8112b8536304ab0fc64fcc41e0615-driver", "namespace": "experimental", "selfLink": "/api/v1/namespaces/experimental/pods/fg-am00-raw12-b1c8112b8536304ab0fc64fcc41e0615-driver", "uid": "adc5a50a-2342-11e8-87dc-12c5b3954044", "resourceVersion": "299054", "creationTimestamp": "2018-03-09T02:36:32Z", "labels": { "spark-app-selector": "spark-4fa9a5ce1b1d401fa9c1e413ff030d44", "spark-role": "driver" }, "annotations": { "spark-app-name": "fg-am00-raw12" } }, "spec": { "volumes": [ { "name": "experimental-service-account-token-msmth", "secret": { "secretName": "experimental-service-account-token-msmth", "defaultMode": 420 } } ], "containers": [ { "name": "spark-kubernetes-driver", "image": "docker.com/cmapp/fg-am00-raw:1.0.0", "args": [ "driver" ], "env": [ { "name": "SPARK_DRIVER_MEMORY", "value": "1g" }, { "name": "SPARK_DRIVER_CLASS", "value": "com.myapp.App" }, { "name": "SPARK_DRIVER_ARGS", "value": "-c /opt/spark/work-dir/app/main/environments/int -w ./../../workflows/workflow_main.json -e prod -n features -v off" }, { "name": "SPARK_DRIVER_BIND_ADDRESS", "valueFrom": { "fieldRef": { "apiVersion": "v1", "fieldPath": "status.podIP" } } }, { "name": "SPARK_MOUNTED_CLASSPATH", "value": "/opt/spark/jars/aws-java-sdk-1.7.4.jar:/opt/spark/jars/hadoop-aws-2.7.3.jar:/opt/spark/jars/guava-14.0.1.jar:/opt/spark/jars/datacleanup-component-1.0-SNAPSHOT.jar:/opt/spark/jars/SparkApp.jar" }, { "name": "SPARK_JAVA_OPT_0", "value": "-Dspark.submit.deployMode=cluster" }, { "name": "SPARK_JAVA_OPT_1", "value": "-Dspark.driver.blockManager.port=7079" }, { "name": "SPARK_JAVA_OPT_2", "value": "-Dspark.app.name=fg-am00-raw12" }, { "name": "SPARK_JAVA_OPT_3", "value": "-Dspark.kubernetes.container.image=docker.com/cmapp/fg-am00-raw:1.0.0" }, { "name": "SPARK_JAVA_OPT_4", "value": "-Dspark.app.id=spark-4fa9a5ce1b1d401fa9c1e413ff030d44" }, { "name": "SPARK_JAVA_OPT_5",
[jira] [Commented] (SPARK-23627) Provide isEmpty() function in DataSet
[ https://issues.apache.org/jira/browse/SPARK-23627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392468#comment-16392468 ] Apache Spark commented on SPARK-23627: -- User 'goungoun' has created a pull request for this issue: https://github.com/apache/spark/pull/20782 > Provide isEmpty() function in DataSet > - > > Key: SPARK-23627 > URL: https://issues.apache.org/jira/browse/SPARK-23627 > Project: Spark > Issue Type: Improvement > Components: Spark Core, SQL >Affects Versions: 2.1.0 >Reporter: Goun Na >Priority: Trivial > > Like rdd.isEmpty, adding isEmpty to DataSet would useful. > Some code without isEmpty: > {code:java} > if (df.count = 0) { do_something }{code} > Some people add limit 1 for a performance reason: > {code:java} > if (df.limit(1).rdd.isEmpty) { do_something } > if (df.rdd.take(1).isEmpty) { do_something }{code} > > If isEmpty is provided, the code will be perfect clean: > {code:java} > if (df.isEmpty) { do_something }{code} > > > > > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23627) Provide isEmpty() function in DataSet
[ https://issues.apache.org/jira/browse/SPARK-23627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23627: Assignee: (was: Apache Spark) > Provide isEmpty() function in DataSet > - > > Key: SPARK-23627 > URL: https://issues.apache.org/jira/browse/SPARK-23627 > Project: Spark > Issue Type: Improvement > Components: Spark Core, SQL >Affects Versions: 2.1.0 >Reporter: Goun Na >Priority: Trivial > > Like rdd.isEmpty, adding isEmpty to DataSet would useful. > Some code without isEmpty: > {code:java} > if (df.count = 0) { do_something }{code} > Some people add limit 1 for a performance reason: > {code:java} > if (df.limit(1).rdd.isEmpty) { do_something } > if (df.rdd.take(1).isEmpty) { do_something }{code} > > If isEmpty is provided, the code will be perfect clean: > {code:java} > if (df.isEmpty) { do_something }{code} > > > > > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23627) Provide isEmpty() function in DataSet
[ https://issues.apache.org/jira/browse/SPARK-23627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23627: Assignee: Apache Spark > Provide isEmpty() function in DataSet > - > > Key: SPARK-23627 > URL: https://issues.apache.org/jira/browse/SPARK-23627 > Project: Spark > Issue Type: Improvement > Components: Spark Core, SQL >Affects Versions: 2.1.0 >Reporter: Goun Na >Assignee: Apache Spark >Priority: Trivial > > Like rdd.isEmpty, adding isEmpty to DataSet would useful. > Some code without isEmpty: > {code:java} > if (df.count = 0) { do_something }{code} > Some people add limit 1 for a performance reason: > {code:java} > if (df.limit(1).rdd.isEmpty) { do_something } > if (df.rdd.take(1).isEmpty) { do_something }{code} > > If isEmpty is provided, the code will be perfect clean: > {code:java} > if (df.isEmpty) { do_something }{code} > > > > > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23637) Yarn might allocate more resource if a same executor is killed multiple times.
[ https://issues.apache.org/jira/browse/SPARK-23637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23637: Assignee: Apache Spark > Yarn might allocate more resource if a same executor is killed multiple times. > -- > > Key: SPARK-23637 > URL: https://issues.apache.org/jira/browse/SPARK-23637 > Project: Spark > Issue Type: Bug > Components: YARN >Affects Versions: 2.3.0 >Reporter: jin xing >Assignee: Apache Spark >Priority: Major > > *{{YarnAllocator}}* uses *{{numExecutorsRunning}}* to track the number of > running executor. *{{numExecutorsRunning}}* is used to check if there're > executors missing and need to allocate more. > In current code, *{{numExecutorsRunning}}* can be negative when driver asks > to kill a same idle executor multiple times. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23637) Yarn might allocate more resource if a same executor is killed multiple times.
[ https://issues.apache.org/jira/browse/SPARK-23637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23637: Assignee: (was: Apache Spark) > Yarn might allocate more resource if a same executor is killed multiple times. > -- > > Key: SPARK-23637 > URL: https://issues.apache.org/jira/browse/SPARK-23637 > Project: Spark > Issue Type: Bug > Components: YARN >Affects Versions: 2.3.0 >Reporter: jin xing >Priority: Major > > *{{YarnAllocator}}* uses *{{numExecutorsRunning}}* to track the number of > running executor. *{{numExecutorsRunning}}* is used to check if there're > executors missing and need to allocate more. > In current code, *{{numExecutorsRunning}}* can be negative when driver asks > to kill a same idle executor multiple times. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23637) Yarn might allocate more resource if a same executor is killed multiple times.
[ https://issues.apache.org/jira/browse/SPARK-23637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392458#comment-16392458 ] Apache Spark commented on SPARK-23637: -- User 'jinxing64' has created a pull request for this issue: https://github.com/apache/spark/pull/20781 > Yarn might allocate more resource if a same executor is killed multiple times. > -- > > Key: SPARK-23637 > URL: https://issues.apache.org/jira/browse/SPARK-23637 > Project: Spark > Issue Type: Bug > Components: YARN >Affects Versions: 2.3.0 >Reporter: jin xing >Priority: Major > > *{{YarnAllocator}}* uses *{{numExecutorsRunning}}* to track the number of > running executor. *{{numExecutorsRunning}}* is used to check if there're > executors missing and need to allocate more. > In current code, *{{numExecutorsRunning}}* can be negative when driver asks > to kill a same idle executor multiple times. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23637) Yarn might allocate more resource if a same executor is killed multiple times.
[ https://issues.apache.org/jira/browse/SPARK-23637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392457#comment-16392457 ] jin xing commented on SPARK-23637: -- PR here: https://github.com/apache/spark/pull/20781 > Yarn might allocate more resource if a same executor is killed multiple times. > -- > > Key: SPARK-23637 > URL: https://issues.apache.org/jira/browse/SPARK-23637 > Project: Spark > Issue Type: Bug > Components: YARN >Affects Versions: 2.3.0 >Reporter: jin xing >Priority: Major > > *{{YarnAllocator}}* uses *{{numExecutorsRunning}}* to track the number of > running executor. *{{numExecutorsRunning}}* is used to check if there're > executors missing and need to allocate more. > In current code, *{{numExecutorsRunning}}* can be negative when driver asks > to kill a same idle executor multiple times. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23637) Yarn might allocate more resource if a same executor is killed multiple times.
[ https://issues.apache.org/jira/browse/SPARK-23637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jin xing updated SPARK-23637: - Description: *{{YarnAllocator}}* uses *{{numExecutorsRunning}}* to track the number of running executor. *{{numExecutorsRunning}}* is used to check if there're executors missing and need to allocate more. In current code, *{{numExecutorsRunning}}* can be negative when driver asks to kill a same idle executor multiple times. was: YarnAllocator}} uses {{numExecutorsRunning to track the number of running executor. {{numExecutorsRunning}} is used to check if there're executors missing and need to allocate more. In current code, {{numExecutorsRunning}} can be negative when driver asks to kill a same idle executor multiple times. > Yarn might allocate more resource if a same executor is killed multiple times. > -- > > Key: SPARK-23637 > URL: https://issues.apache.org/jira/browse/SPARK-23637 > Project: Spark > Issue Type: Bug > Components: YARN >Affects Versions: 2.3.0 >Reporter: jin xing >Priority: Major > > *{{YarnAllocator}}* uses *{{numExecutorsRunning}}* to track the number of > running executor. *{{numExecutorsRunning}}* is used to check if there're > executors missing and need to allocate more. > In current code, *{{numExecutorsRunning}}* can be negative when driver asks > to kill a same idle executor multiple times. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23637) Yarn might allocate more resource if a same executor is killed multiple times.
[ https://issues.apache.org/jira/browse/SPARK-23637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jin xing updated SPARK-23637: - Description: YarnAllocator}} uses {{numExecutorsRunning to track the number of running executor. {{numExecutorsRunning}} is used to check if there're executors missing and need to allocate more. In current code, {{numExecutorsRunning}} can be negative when driver asks to kill a same idle executor multiple times. was: {{YarnAllocator }}uses {{numExecutorsRunning}} to track the number of running executor. {{numExecutorsRunning}} is used to check if there're executors missing and need to allocate more. In current code, {{numExecutorsRunning}} can be negative when driver asks to kill a same idle executor multiple times. > Yarn might allocate more resource if a same executor is killed multiple times. > -- > > Key: SPARK-23637 > URL: https://issues.apache.org/jira/browse/SPARK-23637 > Project: Spark > Issue Type: Bug > Components: YARN >Affects Versions: 2.3.0 >Reporter: jin xing >Priority: Major > > YarnAllocator}} uses {{numExecutorsRunning to track the number of > running executor. {{numExecutorsRunning}} is used to check if there're > executors missing and need to allocate more. > In current code, {{numExecutorsRunning}} can be negative when driver asks to > kill a same idle executor multiple times. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23637) Yarn might allocate more resource if a same executor is killed multiple times.
[ https://issues.apache.org/jira/browse/SPARK-23637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jin xing updated SPARK-23637: - Description: {{YarnAllocator }}uses {{numExecutorsRunning}} to track the number of running executor. {{numExecutorsRunning}} is used to check if there're executors missing and need to allocate more. In current code, {{numExecutorsRunning}} can be negative when driver asks to kill a same idle executor multiple times. was: {{YarnAllocator}} uses {{numExecutorsRunning}} to track the number of running executor. {{numExecutorsRunning}} is used to check if there're executors missing and need to allocate more. In current code, {{numExecutorsRunning}} can be negative when driver asks to kill a same idle executor multiple times. > Yarn might allocate more resource if a same executor is killed multiple times. > -- > > Key: SPARK-23637 > URL: https://issues.apache.org/jira/browse/SPARK-23637 > Project: Spark > Issue Type: Bug > Components: YARN >Affects Versions: 2.3.0 >Reporter: jin xing >Priority: Major > > {{YarnAllocator }}uses {{numExecutorsRunning}} to track the number of running > executor. {{numExecutorsRunning}} is used to check if there're executors > missing and need to allocate more. > In current code, {{numExecutorsRunning}} can be negative when driver asks to > kill a same idle executor multiple times. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23637) Yarn might allocate more resource if a same executor is killed multiple times.
jin xing created SPARK-23637: Summary: Yarn might allocate more resource if a same executor is killed multiple times. Key: SPARK-23637 URL: https://issues.apache.org/jira/browse/SPARK-23637 Project: Spark Issue Type: Bug Components: YARN Affects Versions: 2.3.0 Reporter: jin xing {YarnAllocator} uses \{numExecutorsRunning} to track the number of running executor. \{numExecutorsRunning} is used to check if there're executors missing and need to allocate more. In current code, \{numExecutorsRunning} can be negative when driver asks to kill a same idle executor multiple times. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23637) Yarn might allocate more resource if a same executor is killed multiple times.
[ https://issues.apache.org/jira/browse/SPARK-23637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jin xing updated SPARK-23637: - Description: {{YarnAllocator}} uses {{numExecutorsRunning}} to track the number of running executor. {{numExecutorsRunning}} is used to check if there're executors missing and need to allocate more. In current code, {{numExecutorsRunning}} can be negative when driver asks to kill a same idle executor multiple times. was: {YarnAllocator} uses \{numExecutorsRunning} to track the number of running executor. \{numExecutorsRunning} is used to check if there're executors missing and need to allocate more. In current code, \{numExecutorsRunning} can be negative when driver asks to kill a same idle executor multiple times. > Yarn might allocate more resource if a same executor is killed multiple times. > -- > > Key: SPARK-23637 > URL: https://issues.apache.org/jira/browse/SPARK-23637 > Project: Spark > Issue Type: Bug > Components: YARN >Affects Versions: 2.3.0 >Reporter: jin xing >Priority: Major > > {{YarnAllocator}} > uses {{numExecutorsRunning}} to track the number of running executor. > {{numExecutorsRunning}} is used to check if there're executors missing and > need to allocate more. > In current code, {{numExecutorsRunning}} can be negative when driver asks to > kill a same idle executor multiple times. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23636) [SPARK 2.2] | Kafka Consumer | KafkaUtils.createRDD throws Exception - java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
[ https://issues.apache.org/jira/browse/SPARK-23636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deepak updated SPARK-23636: --- Description: h2. h2. Summary While using the KafkaUtils.createRDD API - we receive below listed error, specifically when 1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an Array(OffsetRanges) h2. Error Faced {noformat} java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access{noformat} Stack Trace {noformat} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor 16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:204) at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat} h2. Config Used to simulate the error A session with : * Executors - 1 * Cores - 2 or More * Kafka Topic - has only 1 partition * While fetching - More than one Array of Offset Range , Example {noformat} Array(OffsetRange("kafka_topic",0,608954201,608954202), OffsetRange("kafka_topic",0,608954202,608954203) ){noformat} h2. Why are we fetching from kafka as mentioned above. This gives us the capability to establish a connection to Kafka Broker for every spark executor's core, thus each core can fetch/process its own set of messages based on the specified (offset ranges). This was working in spark 1.6.2 However, from spark 2.1 onwards - the pattern throws exception. h2. Sample Code {quote}scala snippet - on versions spark 2.2.0 or 2.1.0 // Bunch of imports import kafka.serializer.\{DefaultDecoder, StringDecoder} import org.apache.avro.generic.GenericRecord import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.\{DataFrame, Row, SQLContext} import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.types.\{StringType, StructField, StructType} import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.KafkaUtils._ {quote} {quote}// This forces two connections to same broker for the partition specified below. val parallelizedRanges = Array(OffsetRange("kafka_topic",0,1,2), // Fetching sample 2 records OffsetRange("kafka_topic",0,2,3) // Fetching sample 2 records ) // Initiate kafka properties val kafkaParams1: java.util.Map[String, Object] = new java.util.HashMap() // kafkaParams1.put("key","val") add all the parameters such as broker, topic Not listing every property here. // Create RDD val rDDConsumerRec: RDD[ConsumerRecord[String, String]] = createRDD[String, String](sparkContext , kafkaParams1, parallelizedRanges, LocationStrategies.PreferConsistent) // Map Function val data: RDD[Row] = rDDConsumerRec.map \{ x => Row(x.topic().toString, x.partition().toString, x.offset().toString, x.timestamp().toString, x.value() ) } // Create a DataFrame val df = sqlContext.createDataFrame(data, StructType( Seq( StructField("topic", StringType), StructField("partition", StringType), StructField("offset", StringType), StructField("timestamp", StringType), StructField("value", BinaryType) ))) df.show() // You will see the error reported. {quote} h2. Similar Issue reported earlier, but on a different API A similar issue reported for DirectStream is https://issues.apache.org/jira/browse/SPARK-19185 was: h2. h2. Summary While using the KafkaUtils.createRDD API - we receive below listed error, specifically when 1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an Array(OffsetRanges) h2. Error Faced {noformat} java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access{noformat} Stack Trace {noformat} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor 16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at
[jira] [Updated] (SPARK-23636) [SPARK 2.2] | Kafka Consumer | KafkaUtils.createRDD throws Exception - java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
[ https://issues.apache.org/jira/browse/SPARK-23636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deepak updated SPARK-23636: --- Description: h2. h2. Summary While using the KafkaUtils.createRDD API - we receive below listed error, specifically when 1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an Array(OffsetRanges) h2. Error Faced {noformat} java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access{noformat} Stack Trace {noformat} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor 16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:204) at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat} h2. Config Used to simulate the error A session with : * Executors - 1 * Cores - 2 or More * Kafka Topic - has only 1 partition * While fetching - More than one Array of Offset Range , Example {noformat} Array(OffsetRange("kafka_topic",0,608954201,608954202), OffsetRange("kafka_topic",0,608954202,608954203) ){noformat} h2. Why are we fetching from kafka as mentioned above. This gives us the capability to establish a connection to Kafka Broker for every spark executor's core, thus each core can fetch/process its own set of messages based on the specified (offset ranges). This was working in spark 1.6.2 However, from spark 2.1 onwards - the pattern throws exception. h2. Sample Code {quote}scala snippet - on versions spark 2.2.0 or 2.1.0 // Bunch of imports import kafka.serializer.\{DefaultDecoder, StringDecoder} import org.apache.avro.generic.GenericRecord import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.\{DataFrame, Row, SQLContext} import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.types.\{StringType, StructField, StructType} import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.KafkaUtils._ {quote} {quote}// This forces two connections to same broker for the partition specified below. val parallelizedRanges = Array(OffsetRange("kafka_topic",0,1,2), // Fetching sample 2 records OffsetRange("kafka_topic",0,2,3) // Fetching sample 2 records ) // Initiate kafka properties val kafkaParams1: java.util.Map[String, Object] = new java.util.HashMap() // kafkaParams1.put("key","val") add all the parameters such as broker, topic Not listing every property here. // Create RDD val rDDConsumerRec: RDD[ConsumerRecord[String, String]] = createRDD[String, String](sparkContext , kafkaParams1, parallelizedRanges, LocationStrategies.PreferConsistent) // Map Function val data: RDD[Row] = rDDConsumerRec.map \{ x => Row(x.topic().toString, x.partition().toString, x.offset().toString, x.timestamp().toString, x.value() ) } // Create a DataFrame val df = sqlContext.createDataFrame(data, StructType( Seq( StructField("topic", StringType), StructField("partition", StringType), StructField("offset", StringType), StructField("timestamp", StringType), StructField("value", BinaryType) ))) // Temp Table df.registerTempTable("kafka_topic"); // We are aware this is a deprecated Function // Run Some SQL hiveContext.sql(""" select * from kafka_topic """).show {quote} h2. Similar Issue reported earlier, but on a different API A similar issue reported for DirectStream is https://issues.apache.org/jira/browse/SPARK-19185 was: h2. h2. Summary While using the KafkaUtils.createRDD API - we receive below listed error, specifically when 1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an Array(OffsetRanges) h2. Error Faced {noformat} java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access{noformat} Stack Trace {noformat} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor 16):
[jira] [Updated] (SPARK-23636) [SPARK 2.2] | Kafka Consumer | KafkaUtils.createRDD throws Exception - java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
[ https://issues.apache.org/jira/browse/SPARK-23636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deepak updated SPARK-23636: --- Description: h2. h2. Summary While using the KafkaUtils.createRDD API - we receive below listed error, specifically when 1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an Array(OffsetRanges) h2. Error Faced {noformat} java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access{noformat} Stack Trace {noformat} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor 16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:204) at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat} h2. Config Used to simulate the error A session with : * Executors - 1 * Cores - 2 or More * Kafka Topic - has only 1 partition * While fetching - More than one Array of Offset Range , Example {noformat} Array(OffsetRange("kafka_topic",0,608954201,608954202), OffsetRange("kafka_topic",0,608954202,608954203) ){noformat} h2. Why are we fetching from kafka as mentioned above. This gives us the capability to establish a connection to Kafka Broker for every spark executor's core, thus each core can fetch/process its own set of messages based on the specified (offset ranges). This was working in spark 1.6.2 However, from spark 2.1 onwards - the pattern throws exception. h2. Sample Code {quote}scala // This forces two connections to same broker for the partition specified below. val parallelizedRanges = Array(OffsetRange("kafka_topic",0,1,2), // Fetching sample 2 records OffsetRange("kafka_topic",0,2,3) // Fetching sample 2 records ); val kafkaParams1: java.util.Map[String, Object] = new java.util.HashMap() // kafkaParams1.put("key","val") add all the parameters Not listing every line here. val rDDConsumerRec: RDD[ConsumerRecord[String, String]] = createRDD[String, String](sparkContext , kafkaParams1, parallelizedRanges, LocationStrategies.PreferConsistent); val data: RDD[Row] = rDDConsumerRec.map \{ x => Row(x.topic().toString, x.partition().toString, x.offset().toString, x.timestamp().toString, x.value() ) } ; val df = sqlContext.createDataFrame(data, StructType( Seq( StructField("topic", StringType), StructField("partition", StringType), StructField("offset", StringType), StructField("timestamp", StringType), StructField("value", BinaryType) ))); df.registerTempTable("kafka_topic"); // We are aware this is a deprecated Function hiveContext.sql(""" select * from kafka_topic """).show {quote} h2. Similar Issue reported earlier, but on a different API A similar issue reported for DirectStream is https://issues.apache.org/jira/browse/SPARK-19185 was: h2. h2. Summary While using the KafkaUtils.createRDD API - we receive below listed error, specifically when 1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an Array(OffsetRanges) h2. Error Faced {noformat} java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access{noformat} Stack Trace {noformat} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor 16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:204) at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181) at
[jira] [Updated] (SPARK-23636) [SPARK 2.2] | Kafka Consumer | KafkaUtils.createRDD throws Exception - java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
[ https://issues.apache.org/jira/browse/SPARK-23636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deepak updated SPARK-23636: --- Description: h2. h2. Summary While using the KafkaUtils.createRDD API - we receive below listed error, specifically when 1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an Array(OffsetRanges) h2. Error Faced {noformat} java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access{noformat} Stack Trace {noformat} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor 16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:204) at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat} h2. Config Used to simulate the error A session with : * Executors - 1 * Cores - 2 or More * Kafka Topic - has only 1 partition * While fetching - More than one Array of Offset Range , Example {noformat} Array(OffsetRange("kafka_topic",0,608954201,608954202), OffsetRange("kafka_topic",0,608954202,608954203) ){noformat} h2. Why are we fetching from kafka as mentioned above. This gives us the capability to establish a connection to Kafka Broker for every spark executor's core, thus each core can fetch/process its own set of messages based on the specified (offset ranges). This was working in spark 1.6.2 However, from spark 2.1 onwards - the pattern throws exception. h2. Sample Code {quote}scala // This forces two connections to same broker for the partition specified below. val parallelizedRanges = Array(OffsetRange("kafka_topic",0,1,2), // Fetching sample 2 records OffsetRange("kafka_topic",0,2,3) // Fetching sample 2 records ); val kafkaParams1: java.util.Map[String, Object] = new java.util.HashMap() val rDDConsumerRec: RDD[ConsumerRecord[String, String]] = createRDD[String, String](sparkContext , kafkaParams1, parallelizedRanges, LocationStrategies.PreferConsistent); val data: RDD[Row] = rDDConsumerRec.map \{ x => Row(x.topic().toString, x.partition().toString, x.offset().toString, x.timestamp().toString, x.value() ) } ; val df = sqlContext.createDataFrame(data, StructType( Seq( StructField("topic", StringType), StructField("partition", StringType), StructField("offset", StringType), StructField("timestamp", StringType), StructField("value", BinaryType) ))); df.registerTempTable("kafka_topic"); // We are aware this is a deprecated Function hiveContext.sql(""" select * from kafka_topic """).show {quote} h2. Similar Issue reported earlier, but on a different API A similar issue reported for DirectStream is https://issues.apache.org/jira/browse/SPARK-19185 was: h2. h2. Summary While using the KafkaUtils.createRDD API - we receive below listed error, specifically when 1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an Array(OffsetRanges) h2. Error Faced {noformat} java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access{noformat} Stack Trace {noformat} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor 16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:204) at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat} h2. Config Used to simulate the error A session with : * Executors - 1 * Cores - 2 or
[jira] [Updated] (SPARK-23636) [SPARK 2.2] | Kafka Consumer | KafkaUtils.createRDD throws Exception - java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
[ https://issues.apache.org/jira/browse/SPARK-23636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deepak updated SPARK-23636: --- Description: h2. h2. Summary While using the KafkaUtils.createRDD API - we receive below listed error, specifically when 1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an Array(OffsetRanges) h2. Error Faced {noformat} java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access{noformat} Stack Trace {noformat} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor 16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:204) at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat} h2. Config Used to simulate the error A session with : * Executors - 1 * Cores - 2 or More * Kafka Topic - has only 1 partition * While fetching - More than one Array of Offset Range , Example {noformat} Array(OffsetRange("kafka_topic",0,608954201,608954202), OffsetRange("kafka_topic",0,608954202,608954203) ){noformat} h2. Why are we fetching from kafka as mentioned above. This gives us the capability to establish a connection to Kafka Broker for every spark executor's core, thus each core can fetch/process its own set of messages based on the specified (offset ranges). This was working in spark 1.6.2 However, from spark 2.1 onwards - the pattern throws exception. h2. Sample Code {quote}scala // This forces two connections to same broker for the partition specified below. val parallelizedRanges = Array(OffsetRange("kafka_topic",0,1,2), // Fetching sample 2 records OffsetRange("kafka_topic",0,2,3) // Fetching sample 2 records ); val kafkaParams1: java.util.Map[String, Object] = new java.util.HashMap() val rDDConsumerRec: RDD[ConsumerRecord[String, String]] = createRDD[String, String](sparkContext , kafkaParams1, parallelizedRanges, LocationStrategies.PreferConsistent); val data: RDD[Row] = rDDConsumerRec.map \{ x => Row(x.topic().toString, x.partition().toString, x.offset().toString, x.timestamp().toString, x.value() ) } ; val df = sqlContext.createDataFrame(data, StructType( Seq( StructField("topic", StringType), StructField("partition", StringType), StructField("offset", StringType), StructField("timestamp", StringType), StructField("value", BinaryType) ))); df.registerTempTable("kafka_topic"); // We are aware this is a deprecated Function hiveContext.sql(""" select * from kafka_topic """).show {quote} h2. Related Issue A similar issue reported for DirectStream is https://issues.apache.org/jira/browse/SPARK-19185 was: While using the KafkaUtils.createRDD API - we receive below listed error, especially when 1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an Array(OffsetRanges) h2. Error Faced {noformat} java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access{noformat} Stack Trace {noformat} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor 16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:204) at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat} h2. Config Used to simulate the error A session with : * Executors - 1 * Cores - 2 or More * Kafka Topic - has only 1 partition * While fetching - More than one Array of Offset
[jira] [Updated] (SPARK-23636) [SPARK 2.2] | Kafka Consumer | KafkaUtils.createRDD throws Exception - java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
[ https://issues.apache.org/jira/browse/SPARK-23636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deepak updated SPARK-23636: --- Description: While using the KafkaUtils.createRDD API - we receive below listed error, especially when 1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an Array(OffsetRanges) h2. Error Faced {noformat} java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access{noformat} Stack Trace {noformat} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor 16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:204) at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat} h2. Config Used to simulate the error A session with : * Executors - 1 * Cores - 2 or More * Kafka Topic - has only 1 partition * While fetching - More than one Array of Offset Range , Example {noformat} Array(OffsetRange("kafka_topic",0,608954201,608954202), OffsetRange("kafka_topic",0,608954202,608954203) ){noformat} h2. Why are we fetching from kafka as mentioned above. This gives us the capability to establish a connection to Kafka Broker for every spark executor's core, thus each core can fetch/process its own set of messages based on the specified (offset ranges). This was working in spark 1.6.2 However, from spark 2.1 onwards - the pattern throws exception. h2. Sample Code {quote}scala // This forces two connections to same broker for the partition specified below. val parallelizedRanges = Array(OffsetRange("kafka_topic",0,1,2), // Fetching sample 2 records OffsetRange("kafka_topic",0,2,3) // Fetching sample 2 records ); val kafkaParams1: java.util.Map[String, Object] = new java.util.HashMap() val rDDConsumerRec: RDD[ConsumerRecord[String, String]] = createRDD[String, String](sparkContext , kafkaParams1, parallelizedRanges, LocationStrategies.PreferConsistent); val data: RDD[Row] = rDDConsumerRec.map \{ x => Row(x.topic().toString, x.partition().toString, x.offset().toString, x.timestamp().toString, x.value() ) } ; val df = sqlContext.createDataFrame(data, StructType( Seq( StructField("topic", StringType), StructField("partition", StringType), StructField("offset", StringType), StructField("timestamp", StringType), StructField("value", BinaryType) ))); df.registerTempTable("kafka_topic"); // We are aware this is a deprecated Function hiveContext.sql(""" select * from kafka_topic """).show {quote} h2. Related Issue A similar issue reported for DirectStream is https://issues.apache.org/jira/browse/SPARK-19185 was: While using the KafkaUtils.createRDD API - we receive below listed error, especially when 1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an Array(OffsetRanges) h2. Error Faced {noformat} java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access{noformat} Stack Trace {noformat} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor 16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:204) at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat} h2. Config Used to simulate the error A session with : * Executors - 1 * Cores - 2 or More * Kafka Topic - has only 1 partition * While fetching - More than one Array of Offset Range , Example
[jira] [Updated] (SPARK-23636) [SPARK 2.2] | Kafka Consumer | KafkaUtils.createRDD throws Exception - java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
[ https://issues.apache.org/jira/browse/SPARK-23636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deepak updated SPARK-23636: --- Description: While using the KafkaUtils.createRDD API - we receive below listed error, especially when 1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an Array(OffsetRanges) h2. Error Faced {noformat} java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access{noformat} Stack Trace {noformat} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor 16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:204) at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat} h2. Config Used to simulate the error A session with : * Executors - 1 * Cores - 2 or More * Kafka Topic - has only 1 partition * While fetching - More than one Array of Offset Range , Example {noformat} Array(OffsetRange("kafka_topic",0,608954201,608954202), OffsetRange("kafka_topic",0,608954202,608954203) ){noformat} h2. Why are we fetching from kafka as mentioned above. This gives us the capability to establish a connection to Kafka Broker for every spark executor's core, thus each core can fetch/process its own set of messages based on the specified (offset ranges). This was working in spark 1.6.2 However, from spark 2.1 onwards - the pattern throws exception. h2. Sample Code {quote}scala // This forces two connections to same broker for the partition specified below. val parallelizedRanges = Array(OffsetRange("kafka_topic",0,1,2), // Fetching sample 2 records OffsetRange("kafka_topic",0,2,3) // Fetching sample 2 records ); val kafkaParams1: java.util.Map[String, Object] = new java.util.HashMap() val rDDConsumerRec: RDD[ConsumerRecord[String, String]] = createRDD[String, String](sparkContext , kafkaParams1, parallelizedRanges, LocationStrategies.PreferConsistent); val data: RDD[Row] = rDDConsumerRec.map Unknown macro: \{ x => Row(x.topic().toString, x.partition().toString, x.offset().toString, x.timestamp().toString, x.value() ) } ; val df = sqlContext.createDataFrame(data, StructType( Seq( StructField("topic", StringType), StructField("partition", StringType), StructField("offset", StringType), StructField("timestamp", StringType), StructField("value", BinaryType) ))); df.cache; df.registerTempTable("kafka_topic"); hiveContext.sql(""" select * from kafka_topic """).show {quote} h2. Related Issue A similar issue reported for DirectStream is https://issues.apache.org/jira/browse/SPARK-19185 was: While using the KafkaUtils.createRDD API - we receive below listed error, especially when 1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an Array(OffsetRanges) h2. Error Faced {noformat} java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access{noformat} Stack Trace {noformat} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor 16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:204) at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat} h2. Config Used to simulate the error A session with : * Executors - 1 * Cores - 2 or More * Kafka Topic - has only 1 partition * While fetching - More than one Array of Offset Range , Example {noformat}
[jira] [Assigned] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec
[ https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23598: Assignee: (was: Apache Spark) > WholeStageCodegen can lead to IllegalAccessError calling append for > HashAggregateExec > -- > > Key: SPARK-23598 > URL: https://issues.apache.org/jira/browse/SPARK-23598 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: David Vogelbacher >Priority: Major > > Got the following stacktrace for a large QueryPlan using WholeStageCodeGen: > {noformat} > java.lang.IllegalAccessError: tried to access method > org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V > from class > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345){noformat} > After disabling codegen, everything works. > The root cause seems to be that we are trying to call the protected _append_ > method of > [BufferedRowIterator|https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java#L68] > from an inner-class of a sub-class that is loaded by a different > class-loader (after codegen compilation). > [https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-5.html#jvms-5.4.4] > states that a protected method _R_ can be accessed only if one of the > following two conditions is fulfilled: > # R is protected and is declared in a class C, and D is either a subclass of > C or C itself. Furthermore, if R is not static, then the symbolic reference > to R must contain a symbolic reference to a class T, such that T is either a > subclass of D, a superclass of D, or D itself. > # R is either protected or has default access (that is, neither public nor > protected nor private), and is declared by a class in the same run-time > package as D. > 2.) doesn't apply as we have loaded the class with a different class loader > (and are in a different package) and 1.) doesn't apply because we are > apparently trying to call the method from an inner class of a subclass of > _BufferedRowIterator_. > Looking at the Code path of _WholeStageCodeGen_, the following happens: > # In > [WholeStageCodeGen|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L527], > we create the subclass of _BufferedRowIterator_, along with a _processNext_ > method for processing the output of the child plan. > # In the child, which is a > [HashAggregateExec|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L517], > we create the method which shows up at the top of the stack trace (called > _doAggregateWithKeysOutput_ ) > # We add this method to the compiled code invoking _addNewFunction_ of > [CodeGenerator|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L460] > In the generated function body we call the _append_ method.| > Now, the _addNewFunction_ method states that: > {noformat} > If the code for the `OuterClass` grows too large, the function will be > inlined into a new private, inner class > {noformat} > This indeed seems to happen: the _doAggregateWithKeysOutput_ method is put > into a new private inner class. Thus, it doesn't have access to the protected > _append_ method anymore but still tries to call it, which results in the > _IllegalAccessError._ > Possible fixes: > * Pass in the _inlineToOuterClass_ flag when invoking the _addNewFunction_ > * Make the _append_ method public > * Re-declare the _append_
[jira] [Assigned] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec
[ https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23598: Assignee: Apache Spark > WholeStageCodegen can lead to IllegalAccessError calling append for > HashAggregateExec > -- > > Key: SPARK-23598 > URL: https://issues.apache.org/jira/browse/SPARK-23598 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: David Vogelbacher >Assignee: Apache Spark >Priority: Major > > Got the following stacktrace for a large QueryPlan using WholeStageCodeGen: > {noformat} > java.lang.IllegalAccessError: tried to access method > org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V > from class > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345){noformat} > After disabling codegen, everything works. > The root cause seems to be that we are trying to call the protected _append_ > method of > [BufferedRowIterator|https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java#L68] > from an inner-class of a sub-class that is loaded by a different > class-loader (after codegen compilation). > [https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-5.html#jvms-5.4.4] > states that a protected method _R_ can be accessed only if one of the > following two conditions is fulfilled: > # R is protected and is declared in a class C, and D is either a subclass of > C or C itself. Furthermore, if R is not static, then the symbolic reference > to R must contain a symbolic reference to a class T, such that T is either a > subclass of D, a superclass of D, or D itself. > # R is either protected or has default access (that is, neither public nor > protected nor private), and is declared by a class in the same run-time > package as D. > 2.) doesn't apply as we have loaded the class with a different class loader > (and are in a different package) and 1.) doesn't apply because we are > apparently trying to call the method from an inner class of a subclass of > _BufferedRowIterator_. > Looking at the Code path of _WholeStageCodeGen_, the following happens: > # In > [WholeStageCodeGen|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L527], > we create the subclass of _BufferedRowIterator_, along with a _processNext_ > method for processing the output of the child plan. > # In the child, which is a > [HashAggregateExec|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L517], > we create the method which shows up at the top of the stack trace (called > _doAggregateWithKeysOutput_ ) > # We add this method to the compiled code invoking _addNewFunction_ of > [CodeGenerator|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L460] > In the generated function body we call the _append_ method.| > Now, the _addNewFunction_ method states that: > {noformat} > If the code for the `OuterClass` grows too large, the function will be > inlined into a new private, inner class > {noformat} > This indeed seems to happen: the _doAggregateWithKeysOutput_ method is put > into a new private inner class. Thus, it doesn't have access to the protected > _append_ method anymore but still tries to call it, which results in the > _IllegalAccessError._ > Possible fixes: > * Pass in the _inlineToOuterClass_ flag when invoking the _addNewFunction_ > * Make the _append_ method public > *
[jira] [Commented] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec
[ https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392360#comment-16392360 ] Apache Spark commented on SPARK-23598: -- User 'kiszk' has created a pull request for this issue: https://github.com/apache/spark/pull/20779 > WholeStageCodegen can lead to IllegalAccessError calling append for > HashAggregateExec > -- > > Key: SPARK-23598 > URL: https://issues.apache.org/jira/browse/SPARK-23598 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: David Vogelbacher >Priority: Major > > Got the following stacktrace for a large QueryPlan using WholeStageCodeGen: > {noformat} > java.lang.IllegalAccessError: tried to access method > org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V > from class > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345){noformat} > After disabling codegen, everything works. > The root cause seems to be that we are trying to call the protected _append_ > method of > [BufferedRowIterator|https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java#L68] > from an inner-class of a sub-class that is loaded by a different > class-loader (after codegen compilation). > [https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-5.html#jvms-5.4.4] > states that a protected method _R_ can be accessed only if one of the > following two conditions is fulfilled: > # R is protected and is declared in a class C, and D is either a subclass of > C or C itself. Furthermore, if R is not static, then the symbolic reference > to R must contain a symbolic reference to a class T, such that T is either a > subclass of D, a superclass of D, or D itself. > # R is either protected or has default access (that is, neither public nor > protected nor private), and is declared by a class in the same run-time > package as D. > 2.) doesn't apply as we have loaded the class with a different class loader > (and are in a different package) and 1.) doesn't apply because we are > apparently trying to call the method from an inner class of a subclass of > _BufferedRowIterator_. > Looking at the Code path of _WholeStageCodeGen_, the following happens: > # In > [WholeStageCodeGen|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L527], > we create the subclass of _BufferedRowIterator_, along with a _processNext_ > method for processing the output of the child plan. > # In the child, which is a > [HashAggregateExec|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L517], > we create the method which shows up at the top of the stack trace (called > _doAggregateWithKeysOutput_ ) > # We add this method to the compiled code invoking _addNewFunction_ of > [CodeGenerator|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L460] > In the generated function body we call the _append_ method.| > Now, the _addNewFunction_ method states that: > {noformat} > If the code for the `OuterClass` grows too large, the function will be > inlined into a new private, inner class > {noformat} > This indeed seems to happen: the _doAggregateWithKeysOutput_ method is put > into a new private inner class. Thus, it doesn't have access to the protected > _append_ method anymore but still tries to call it, which results in the > _IllegalAccessError._ > Possible fixes: > * Pass in the _inlineToOuterClass_ flag when
[jira] [Comment Edited] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec
[ https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391834#comment-16391834 ] Kazuaki Ishizaki edited comment on SPARK-23598 at 3/9/18 3:20 AM: -- Thanks, I confirmed that I can reproduce this issue with the master. Generated code is here {code:java} /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage1(references); /* 003 */ } /* 004 */ /* 005 */ // codegenStageId=1 /* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; /* 008 */ private scala.collection.Iterator[] inputs; /* 009 */ private boolean agg_initAgg; /* 010 */ private boolean agg_bufIsNull; /* 011 */ private double agg_bufValue; /* 012 */ private boolean agg_bufIsNull1; /* 013 */ private long agg_bufValue1; /* 014 */ private agg_FastHashMap agg_fastHashMap; /* 015 */ private org.apache.spark.unsafe.KVIteratoragg_fastHashMapIter; /* 016 */ private org.apache.spark.unsafe.KVIterator agg_mapIter; /* 017 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap; /* 018 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter; /* 019 */ private scala.collection.Iterator inputadapter_input; /* 020 */ private boolean agg_agg_isNull11; /* 021 */ private boolean agg_agg_isNull25; /* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] agg_mutableStateArray1 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[2]; /* 023 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] agg_mutableStateArray2 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2]; /* 024 */ private UnsafeRow[] agg_mutableStateArray = new UnsafeRow[2]; /* 025 */ /* 026 */ public GeneratedIteratorForCodegenStage1(Object[] references) { /* 027 */ this.references = references; /* 028 */ } /* 029 */ /* 030 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 031 */ partitionIndex = index; /* 032 */ this.inputs = inputs; /* 033 */ /* 034 */ agg_fastHashMap = new agg_FastHashMap(((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).getTaskMemoryManager(), ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).getEmptyAggregationBuffer()); /* 035 */ agg_hashMap = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).createHashMap(); /* 036 */ inputadapter_input = inputs[0]; /* 037 */ agg_mutableStateArray[0] = new UnsafeRow(1); /* 038 */ agg_mutableStateArray1[0] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_mutableStateArray[0], 32); /* 039 */ agg_mutableStateArray2[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_mutableStateArray1[0], 1); /* 040 */ agg_mutableStateArray[1] = new UnsafeRow(3); /* 041 */ agg_mutableStateArray1[1] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_mutableStateArray[1], 32); /* 042 */ agg_mutableStateArray2[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_mutableStateArray1[1], 3); /* 043 */ /* 044 */ } /* 045 */ /* 046 */ public class agg_FastHashMap { /* 047 */ private org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch batch; /* 048 */ private int[] buckets; /* 049 */ private int capacity = 1 << 16; /* 050 */ private double loadFactor = 0.5; /* 051 */ private int numBuckets = (int) (capacity / loadFactor); /* 052 */ private int maxSteps = 2; /* 053 */ private int numRows = 0; /* 054 */ private org.apache.spark.sql.types.StructType keySchema = new org.apache.spark.sql.types.StructType().add(((java.lang.String) references[1] /* keyName */), org.apache.spark.sql.types.DataTypes.StringType); /* 055 */ private org.apache.spark.sql.types.StructType valueSchema = new org.apache.spark.sql.types.StructType().add(((java.lang.String) references[2] /* keyName */), org.apache.spark.sql.types.DataTypes.DoubleType) /* 056 */ .add(((java.lang.String) references[3] /* keyName */), org.apache.spark.sql.types.DataTypes.LongType); /* 057 */ private Object emptyVBase; /* 058 */ private long emptyVOff; /* 059 */ private int emptyVLen; /* 060 */ private boolean isBatchFull = false; /* 061 */ /* 062 */ public agg_FastHashMap( /* 063 */ org.apache.spark.memory.TaskMemoryManager taskMemoryManager, /* 064 */ InternalRow emptyAggregationBuffer) { /* 065 */ batch = org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch /* 066 */ .allocate(keySchema, valueSchema,
[jira] [Updated] (SPARK-23636) [SPARK 2.2] | Kafka Consumer | KafkaUtils.createRDD throws Exception - java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
[ https://issues.apache.org/jira/browse/SPARK-23636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deepak updated SPARK-23636: --- Description: While using the KafkaUtils.createRDD API - we receive below listed error, especially when 1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an Array(OffsetRanges) h2. Error Faced {noformat} java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access{noformat} Stack Trace {noformat} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor 16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:204) at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat} h2. Config Used to simulate the error A session with : * Executors - 1 * Cores - 2 or More * Kafka Topic - has only 1 partition * While fetching - More than one Array of Offset Range , Example {noformat} Array(OffsetRange("kafka_topic",0,608954201,608954202), OffsetRange("kafka_topic",0,608954202,608954203) ){noformat} h2. Why are we fetching from kafka as mentioned above. This gives us the capability to establish a connection to Kafka Broker for every spark executor's core, thus each core can fetch/process its own set of messages based on the specified (offset ranges). This was working in spark 1.6.2 However, from spark 2.1 onwards - the pattern throws exception. h2. Sample Code {quote}scala // This forces two connections to same broker for the partition specified below. val parallelizedRanges = Array(OffsetRange("kafka_topic",0,1,2), // Fetching sample 2 records OffsetRange("kafka_topic",0,2,3) // Fetching sample 2 records ); val kafkaParams1: java.util.Map[String, Object] = new java.util.HashMap() val rDDConsumerRec: RDD[ConsumerRecord[String, String]] = createRDD[String, String](hiveContext.sparkContext , kafkaParams1, parallelizedRanges, LocationStrategies.PreferConsistent); val data: RDD[Row] = rDDConsumerRec.map Unknown macro: \{ x => Row(x.topic().toString, x.partition().toString, x.offset().toString, x.timestamp().toString, x.value() ) } ; val df = sqlContext.createDataFrame(data, StructType( Seq( StructField("topic", StringType), StructField("partition", StringType), StructField("offset", StringType), StructField("timestamp", StringType), StructField("value", BinaryType) ))); df.cache; df.registerTempTable("kafka_topic"); hiveContext.sql(""" select * from kafka_topic """).show {quote} h2. Related Issue A similar issue reported for DirectStream is https://issues.apache.org/jira/browse/SPARK-19185 was: While using the KafkaUtils.createRDD API - we receive below listed error, especially when 1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an Array(OffsetRanges) h2. Error Faced {noformat} java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access{noformat} Stack Trace {noformat} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor 16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:204) at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat} h2. Config Used to simulate the error A session with : * Executors - 1 * Cores - 2 or More * Kafka Topic - has only 1 partition * While fetching - More than one Array of Offset Range , Example {noformat}
[jira] [Updated] (SPARK-23636) [SPARK 2.2] | Kafka Consumer | KafkaUtils.createRDD throws Exception - java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
[ https://issues.apache.org/jira/browse/SPARK-23636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deepak updated SPARK-23636: --- Description: While using the KafkaUtils.createRDD API - we receive below listed error, especially when 1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an Array(OffsetRanges) h2. Error Faced {noformat} java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access{noformat} Stack Trace {noformat} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor 16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:204) at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat} h2. Config Used to simulate the error A session with : * Executors - 1 * Cores - 2 or More * Kafka Topic - has only 1 partition * While fetching - More than one Array of Offset Range , Example {noformat} Array(OffsetRange("kafka_topic",0,608954201,608954202), OffsetRange("kafka_topic",0,608954202,608954203) ){noformat} h2. Why are we fetching from kafka as mentioned above. This gives us the capability to use a connection each for every core available in the spark executor - to fetch and process its own set of messages (offset ranges) from kafka. This was working in spark 1.6.2 However, from spark 2.1 onwards - the pattern throws exception. h2. Sample Code {quote}scala // This forces two connections to same broker for the partition specified below. val parallelizedRanges = Array(OffsetRange("kafka_topic",0,1,2), // Fetching sample 2 records OffsetRange("kafka_topic",0,2,3) // Fetching sample 2 records ); val kafkaParams1: java.util.Map[String, Object] = new java.util.HashMap() val rDDConsumerRec: RDD[ConsumerRecord[String, String]] = createRDD[String, String](hiveContext.sparkContext , kafkaParams1, parallelizedRanges, LocationStrategies.PreferConsistent); val data: RDD[Row] = rDDConsumerRec.map Unknown macro: \{ x => Row(x.topic().toString, x.partition().toString, x.offset().toString, x.timestamp().toString, x.value() ) } ; val df = sqlContext.createDataFrame(data, StructType( Seq( StructField("topic", StringType), StructField("partition", StringType), StructField("offset", StringType), StructField("timestamp", StringType), StructField("value", BinaryType) ))); df.cache; df.registerTempTable("kafka_topic"); hiveContext.sql(""" select * from kafka_topic """).show {quote} h2. Related Issue A similar issue reported for DirectStream is https://issues.apache.org/jira/browse/SPARK-19185 was: While using the KafkaUtils.createRDD API - we receive below listed error, especially when 1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an Array(OffsetRanges) h2. Error Faced {noformat} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor 16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:204) at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat} h2. Config Used to simulate the error A session with : * Executors - 1 * Cores - 2 or More * Kafka Topic - has only 1 partition * While fetching - More than one Array of Offset Range , Example {noformat} Array(OffsetRange("kafka_topic",0,608954201,608954202), OffsetRange("kafka_topic",0,608954202,608954203) ){noformat} h2. Why are we fetching from kafka as
[jira] [Updated] (SPARK-23636) [SPARK 2.2] | Kafka Consumer | KafkaUtils.createRDD throws Exception - java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
[ https://issues.apache.org/jira/browse/SPARK-23636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deepak updated SPARK-23636: --- Description: While using the KafkaUtils.createRDD API - we receive below listed error, especially when 1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an Array(OffsetRanges) h2. Error Faced {noformat} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor 16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:204) at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat} h2. Config Used to simulate the error A session with : * Executors - 1 * Cores - 2 or More * Kafka Topic - has only 1 partition * While fetching - More than one Array of Offset Range , Example {noformat} Array(OffsetRange("kafka_topic",0,608954201,608954202), OffsetRange("kafka_topic",0,608954202,608954203) ){noformat} h2. Why are we fetching from kafka as mentioned above. This gives us the capability to use a connection each for every core available in the spark executor - to fetch and process its own set of messages (offset ranges) from kafka. This was working in spark 1.6.2 However, from spark 2.1 onwards - the pattern throws exception. h2. Sample Code {quote}scala // This forces two connections to same broker for the partition specified below. val parallelizedRanges = Array(OffsetRange("kafka_topic",0,1,2), // Fetching sample 2 records OffsetRange("kafka_topic",0,2,3) // Fetching sample 2 records ); val kafkaParams1: java.util.Map[String, Object] = new java.util.HashMap() val rDDConsumerRec: RDD[ConsumerRecord[String, String]] = createRDD[String, String](hiveContext.sparkContext , kafkaParams1, parallelizedRanges, LocationStrategies.PreferConsistent); val data: RDD[Row] = rDDConsumerRec.map Unknown macro: \{ x => Row(x.topic().toString, x.partition().toString, x.offset().toString, x.timestamp().toString, x.value() ) } ; val df = sqlContext.createDataFrame(data, StructType( Seq( StructField("topic", StringType), StructField("partition", StringType), StructField("offset", StringType), StructField("timestamp", StringType), StructField("value", BinaryType) ))); df.cache; df.registerTempTable("kafka_topic"); hiveContext.sql(""" select * from kafka_topic """).show {quote} h2. Related Issue A similar issue reported for DirectStream is https://issues.apache.org/jira/browse/SPARK-19185 was: While using the KafkaUtils.createRDD API - we receive below listed error, especially when a 1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an Array(OffsetRanges) h2. Error Faced {noformat} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor 16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:204) at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat} h2. Config Used to simulate the error A session with : * Executors - 1 * Cores - 2 or More * Kafka Topic - has only 1 partition * While fetching - More than one Array of Offset Range , Example {noformat} Array(OffsetRange("kafka_topic",0,608954201,608954202), OffsetRange("kafka_topic",0,608954202,608954203) ){noformat} h2. Why are we fetching from kafka as mentioned above. This gives us the capability to use a connection each for every core available in the spark executor - to
[jira] [Created] (SPARK-23636) [SPARK 2.2] | Kafka Consumer | KafkaUtils.createRDD throws Exception - java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
Deepak created SPARK-23636: -- Summary: [SPARK 2.2] | Kafka Consumer | KafkaUtils.createRDD throws Exception - java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access Key: SPARK-23636 URL: https://issues.apache.org/jira/browse/SPARK-23636 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.2.0, 2.1.1 Reporter: Deepak While using the KafkaUtils.createRDD API - we receive below listed error, especially when a 1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an Array(OffsetRanges) h2. Error Faced {noformat} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor 16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:204) at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat} h2. Config Used to simulate the error A session with : * Executors - 1 * Cores - 2 or More * Kafka Topic - has only 1 partition * While fetching - More than one Array of Offset Range , Example {noformat} Array(OffsetRange("kafka_topic",0,608954201,608954202), OffsetRange("kafka_topic",0,608954202,608954203) ){noformat} h2. Why are we fetching from kafka as mentioned above. This gives us the capability to use a connection each for every core available in the spark executor - to fetch and process its own set of messages (offset ranges) from kafka. This was working in spark 1.6.2 However, from spark 2.1 onwards - the pattern throws exception. h2. Sample Code {quote}scala // This forces two connections to same broker for the partition specified below. val parallelizedRanges = Array(OffsetRange("kafka_topic",0,1,2), // Fetching sample 2 records OffsetRange("kafka_topic",0,2,3) // Fetching sample 2 records ); val kafkaParams1: java.util.Map[String, Object] = new java.util.HashMap() val rDDConsumerRec: RDD[ConsumerRecord[String, String]] = createRDD[String, String](hiveContext.sparkContext , kafkaParams1, parallelizedRanges, LocationStrategies.PreferConsistent); val data: RDD[Row] = rDDConsumerRec.map { x => Row(x.topic().toString, x.partition().toString, x.offset().toString, x.timestamp().toString, x.value() ) }; val df = sqlContext.createDataFrame(data, StructType( Seq( StructField("topic", StringType), StructField("partition", StringType), StructField("offset", StringType), StructField("timestamp", StringType), StructField("value", BinaryType) ))); df.cache; df.registerTempTable("kafka_topic"); hiveContext.sql(""" select * from kafka_topic """).show {quote} h2. Related Issue A similar issue reported for DirectStream is https://issues.apache.org/jira/browse/SPARK-19185 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23600) conda_panda_example test fails to import panda lib with Spark 2.3
[ https://issues.apache.org/jira/browse/SPARK-23600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392301#comment-16392301 ] Hyukjin Kwon commented on SPARK-23600: -- ping [~ssha...@hortonworks.com] > conda_panda_example test fails to import panda lib with Spark 2.3 > - > > Key: SPARK-23600 > URL: https://issues.apache.org/jira/browse/SPARK-23600 > Project: Spark > Issue Type: Bug > Components: Spark Submit >Affects Versions: 2.3.0 > Environment: ambari-server --version 2.7.0.2-64 > HDP-3.0.0.2-132 >Reporter: Supreeth Sharma >Priority: Major > > With Spark2.3, conda panda test is failing to import panda. > python version: Python 2.7.5 > 1) Create Requirement file. > virtual_env_type : Native > {code:java} > packaging==16.8 > panda==0.3.1 > pyparsing==2.1.10 > requests==2.13.0 > six==1.10.0 > numpy==1.12.0 > pandas==0.19.2 > python-dateutil==2.6.0 > pytz==2016.10 > {code} > virtual_env_type : conda > {code:java} > mkl=2017.0.1=0 > numpy=1.12.0=py27_0 > openssl=1.0.2k=0 > pandas=0.19.2=np112py27_1 > pip=9.0.1=py27_1 > python=2.7.13=0 > python-dateutil=2.6.0=py27_0 > pytz=2016.10=py27_0 > readline=6.2=2 > setuptools=27.2.0=py27_0 > six=1.10.0=py27_0 > sqlite=3.13.0=0 > tk=8.5.18=0 > wheel=0.29.0=py27_0 > zlib=1.2.8=3 > {code} > 2) Run conda panda test > {code:java} > spark-submit --master yarn-client --jars > /usr/hdp/current/hadoop-client/lib/hadoop-lzo-0.6.0.3.0.0.2-132.jar --conf > spark.pyspark.virtualenv.enabled=true --conf > spark.pyspark.virtualenv.type=native --conf > spark.pyspark.virtualenv.requirements=/tmp/requirements.txt --conf > spark.pyspark.virtualenv.bin.path=/usr/bin/virtualenv > /hwqe/hadoopqe/tests/spark/data/conda_panda_example.py 2>&1 | tee > /tmp/1/Spark_clientLogs/pyenv_conda_panda_example_native_yarn-client.log > {code} > 3) Application fail to import panda. > {code:java} > 2018-03-05 13:43:31,493|INFO|MainThread|machine.py:167 - > run()||GUID=a3cb88f7-bf55-4d9e-9cfe-3e44eae3a72b|18/03/05 13:43:31 INFO > YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling > beginning after reached minRegisteredResourcesRatio: 0.8 > 2018-03-05 13:43:31,527|INFO|MainThread|machine.py:167 - > run()||GUID=a3cb88f7-bf55-4d9e-9cfe-3e44eae3a72b|Traceback (most recent call > last): > 2018-03-05 13:43:31,527|INFO|MainThread|machine.py:167 - > run()||GUID=a3cb88f7-bf55-4d9e-9cfe-3e44eae3a72b|File > "/hwqe/hadoopqe/tests/spark/data/conda_panda_example.py", line 5, in > 2018-03-05 13:43:31,528|INFO|MainThread|machine.py:167 - > run()||GUID=a3cb88f7-bf55-4d9e-9cfe-3e44eae3a72b|import pandas as pd > 2018-03-05 13:43:31,528|INFO|MainThread|machine.py:167 - > run()||GUID=a3cb88f7-bf55-4d9e-9cfe-3e44eae3a72b|ImportError: No module named > pandas > 2018-03-05 13:43:31,547|INFO|MainThread|machine.py:167 - > run()||GUID=a3cb88f7-bf55-4d9e-9cfe-3e44eae3a72b|18/03/05 13:43:31 INFO > BlockManagerMasterEndpoint: Registering block manager > ctr-e138-1518143905142-67599-01-05.hwx.site:44861 with 366.3 MB RAM, > BlockManagerId(2, ctr-e138-1518143905142-67599-01-05.hwx.site, 44861, > None){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23613) Different Analyzed logical plan data types for the same table in different queries
[ https://issues.apache.org/jira/browse/SPARK-23613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392298#comment-16392298 ] Hyukjin Kwon commented on SPARK-23613: -- Let's avoid to set a blocker which is usually reserved for committers. Mind if I ask a self-contained reproducer? I would like to check it > Different Analyzed logical plan data types for the same table in different > queries > -- > > Key: SPARK-23613 > URL: https://issues.apache.org/jira/browse/SPARK-23613 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 > Environment: Spark 2.2.0 > Hive: 2 >Reporter: Ramandeep Singh >Priority: Major > Labels: SparkSQL > > Hi, > The column datatypes are correctly analyzed for simple select query. Note > that the problematic column is not selected anywhere in the complicated > scenario. > Let's say Select * from a; > Now let's say there is a query involving temporary view on another table and > its join with this table. > Let's call that table b (temporary view on a dataframe); > select * from jq ( select a.col1, b.col2 from a,b where a.col3=b=col3) > Fails with exception on some column not part of the projection in the join > query > Exception in thread "main" org.apache.spark.sql.AnalysisException: Cannot up > cast `a`.col5 from from decimal(8,0) to col5#1234: decimal(6,2) as it may > truncate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23613) Different Analyzed logical plan data types for the same table in different queries
[ https://issues.apache.org/jira/browse/SPARK-23613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon updated SPARK-23613: - Priority: Major (was: Blocker) > Different Analyzed logical plan data types for the same table in different > queries > -- > > Key: SPARK-23613 > URL: https://issues.apache.org/jira/browse/SPARK-23613 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 > Environment: Spark 2.2.0 > Hive: 2 >Reporter: Ramandeep Singh >Priority: Major > Labels: SparkSQL > > Hi, > The column datatypes are correctly analyzed for simple select query. Note > that the problematic column is not selected anywhere in the complicated > scenario. > Let's say Select * from a; > Now let's say there is a query involving temporary view on another table and > its join with this table. > Let's call that table b (temporary view on a dataframe); > select * from jq ( select a.col1, b.col2 from a,b where a.col3=b=col3) > Fails with exception on some column not part of the projection in the join > query > Exception in thread "main" org.apache.spark.sql.AnalysisException: Cannot up > cast `a`.col5 from from decimal(8,0) to col5#1234: decimal(6,2) as it may > truncate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23635) Spark executor env variable is overwritten by same name AM env variable
Saisai Shao created SPARK-23635: --- Summary: Spark executor env variable is overwritten by same name AM env variable Key: SPARK-23635 URL: https://issues.apache.org/jira/browse/SPARK-23635 Project: Spark Issue Type: Bug Components: YARN Affects Versions: 2.3.0 Reporter: Saisai Shao In the current Spark on YARN code, AM always will copy and overwrite its env variables to executors, so we cannot set different values of executors. To reproduce issue, user could start spark-shell like: {code} ./bin/spark-shell --master yarn-client --conf spark.executorEnv.SPARK_ABC=executor_val --conf spark.yarn.appMasterEnv.SPARK_ABC=am_val {code} Then check executor env variables by {code} sc.parallelize(1 to 1).flatMap \{ i => sys.env.toSeq }.collect.foreach(println) {code} You will always get \{{am_val}} instead of {{executor_val}}. So we should not let AM to overwrite specifically set executor env variables. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23635) Spark executor env variable is overwritten by same name AM env variable
[ https://issues.apache.org/jira/browse/SPARK-23635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Saisai Shao updated SPARK-23635: Description: In the current Spark on YARN code, AM always will copy and overwrite its env variables to executors, so we cannot set different values for executors. To reproduce issue, user could start spark-shell like: {code:java} ./bin/spark-shell --master yarn-client --conf spark.executorEnv.SPARK_ABC=executor_val --conf spark.yarn.appMasterEnv.SPARK_ABC=am_val {code} Then check executor env variables by {code:java} sc.parallelize(1 to 1).flatMap \{ i => sys.env.toSeq }.collect.foreach(println) {code} You will always get {{am_val}} instead of {{executor_val}}. So we should not let AM to overwrite specifically set executor env variables. was: In the current Spark on YARN code, AM always will copy and overwrite its env variables to executors, so we cannot set different values of executors. To reproduce issue, user could start spark-shell like: {code} ./bin/spark-shell --master yarn-client --conf spark.executorEnv.SPARK_ABC=executor_val --conf spark.yarn.appMasterEnv.SPARK_ABC=am_val {code} Then check executor env variables by {code} sc.parallelize(1 to 1).flatMap \{ i => sys.env.toSeq }.collect.foreach(println) {code} You will always get \{{am_val}} instead of {{executor_val}}. So we should not let AM to overwrite specifically set executor env variables. > Spark executor env variable is overwritten by same name AM env variable > --- > > Key: SPARK-23635 > URL: https://issues.apache.org/jira/browse/SPARK-23635 > Project: Spark > Issue Type: Bug > Components: YARN >Affects Versions: 2.3.0 >Reporter: Saisai Shao >Priority: Minor > > In the current Spark on YARN code, AM always will copy and overwrite its env > variables to executors, so we cannot set different values for executors. > To reproduce issue, user could start spark-shell like: > {code:java} > ./bin/spark-shell --master yarn-client --conf > spark.executorEnv.SPARK_ABC=executor_val --conf > spark.yarn.appMasterEnv.SPARK_ABC=am_val > {code} > Then check executor env variables by > {code:java} > sc.parallelize(1 to 1).flatMap \{ i => sys.env.toSeq > }.collect.foreach(println) > {code} > You will always get {{am_val}} instead of {{executor_val}}. So we should not > let AM to overwrite specifically set executor env variables. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-10884) Support prediction on single instance for regression and classification related models
[ https://issues.apache.org/jira/browse/SPARK-10884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joseph K. Bradley reassigned SPARK-10884: - Assignee: Weichen Xu (was: Yanbo Liang) > Support prediction on single instance for regression and classification > related models > -- > > Key: SPARK-10884 > URL: https://issues.apache.org/jira/browse/SPARK-10884 > Project: Spark > Issue Type: Sub-task > Components: ML >Reporter: Yanbo Liang >Assignee: Weichen Xu >Priority: Major > Labels: 2.2.0 > > Support prediction on single instance for regression and classification > related models (i.e., PredictionModel, ClassificationModel and their sub > classes). > Add corresponding test cases. > See parent issue for more details. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21568) ConsoleProgressBar should only be enabled in shells
[ https://issues.apache.org/jira/browse/SPARK-21568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392196#comment-16392196 ] Matthias Boehm commented on SPARK-21568: I just upgraded to Spark 2.3 and was about to file a bug for the missing progress indicators when running through spark-submit in yarn client mode. It's not a major issue but from my perspective, the new behavior is inconsistent with the documentation which says that {{--conf spark.ui.showConsoleProgress=true}} is the default. > ConsoleProgressBar should only be enabled in shells > --- > > Key: SPARK-21568 > URL: https://issues.apache.org/jira/browse/SPARK-21568 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Marcelo Vanzin >Assignee: Dongjoon Hyun >Priority: Minor > Fix For: 2.3.0 > > > This is the current logic that enables the progress bar: > {code} > _progressBar = > if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && > !log.isInfoEnabled) { > Some(new ConsoleProgressBar(this)) > } else { > None > } > {code} > That is based on the logging level; it just happens to align with the default > configuration for shells (WARN) and normal apps (INFO). > But if someone changes the default logging config for their app, this may > break; they may silence logs by setting the default level to WARN or ERROR, > and a normal application will see a lot of log spam from the progress bar > (which is especially bad when output is redirected to a file, as is usually > done when running in cluster mode). > While it's possible to disable the progress bar separately, this behavior is > not really expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23584) Add interpreted execution to NewInstance expression
[ https://issues.apache.org/jira/browse/SPARK-23584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392155#comment-16392155 ] Apache Spark commented on SPARK-23584: -- User 'maropu' has created a pull request for this issue: https://github.com/apache/spark/pull/20778 > Add interpreted execution to NewInstance expression > --- > > Key: SPARK-23584 > URL: https://issues.apache.org/jira/browse/SPARK-23584 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Herman van Hovell >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23584) Add interpreted execution to NewInstance expression
[ https://issues.apache.org/jira/browse/SPARK-23584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23584: Assignee: Apache Spark > Add interpreted execution to NewInstance expression > --- > > Key: SPARK-23584 > URL: https://issues.apache.org/jira/browse/SPARK-23584 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Herman van Hovell >Assignee: Apache Spark >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23584) Add interpreted execution to NewInstance expression
[ https://issues.apache.org/jira/browse/SPARK-23584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23584: Assignee: (was: Apache Spark) > Add interpreted execution to NewInstance expression > --- > > Key: SPARK-23584 > URL: https://issues.apache.org/jira/browse/SPARK-23584 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Herman van Hovell >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23162) PySpark ML LinearRegressionSummary missing r2adj
[ https://issues.apache.org/jira/browse/SPARK-23162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392097#comment-16392097 ] kevin yu commented on SPARK-23162: -- Currently testing the code.. will open an pr soon. Kevin > PySpark ML LinearRegressionSummary missing r2adj > > > Key: SPARK-23162 > URL: https://issues.apache.org/jira/browse/SPARK-23162 > Project: Spark > Issue Type: Improvement > Components: ML, PySpark >Affects Versions: 2.3.0 >Reporter: Bryan Cutler >Priority: Minor > Labels: starter > > Missing the Python API for {{r2adj}} in {{LinearRegressionSummary}} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23634) AttributeReferences may be too conservative wrt nullability after optimization
Henry Robinson created SPARK-23634: -- Summary: AttributeReferences may be too conservative wrt nullability after optimization Key: SPARK-23634 URL: https://issues.apache.org/jira/browse/SPARK-23634 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0 Reporter: Henry Robinson An {{AttributeReference}} effectively caches the nullability of its referent when it is created. Some optimization rules can transform a nullable attribute into a non-nullable one, but the references to it are not updated. We could add a transformation rule that visits every {{AttributeReference}} and fixes its nullability after optimization. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-23271) Parquet output contains only "_SUCCESS" file after empty DataFrame saving
[ https://issues.apache.org/jira/browse/SPARK-23271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-23271. - Resolution: Fixed Fix Version/s: 2.4.0 Issue resolved by pull request 20525 [https://github.com/apache/spark/pull/20525] > Parquet output contains only "_SUCCESS" file after empty DataFrame saving > -- > > Key: SPARK-23271 > URL: https://issues.apache.org/jira/browse/SPARK-23271 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Pavlo Z. >Assignee: Dilip Biswal >Priority: Minor > Fix For: 2.4.0 > > Attachments: parquet-empty-output.zip > > > Sophisticated case, reproduced only if read empty CSV file without header > with assigned schema. > Steps for reproduce (Scala): > {code:java} > val anySchema = StructType(StructField("anyName", StringType, nullable = > false) :: Nil) > val inputDF = spark.read.schema(anySchema).csv(inputFolderWithEmptyCSVFile) > inputDF.write.parquet(outputFolderName) > // Exception: org.apache.spark.sql.AnalysisException: Unable to infer schema > for Parquet. It must be specified manually.; > val actualDF = spark.read.parquet(outputFolderName) > > {code} > *Actual:* Only "_SUCCESS" file in output directory > *Expected*: at least one Parquet file with schema. > Project for reproduce is attached. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23271) Parquet output contains only "_SUCCESS" file after empty DataFrame saving
[ https://issues.apache.org/jira/browse/SPARK-23271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-23271: --- Assignee: Dilip Biswal > Parquet output contains only "_SUCCESS" file after empty DataFrame saving > -- > > Key: SPARK-23271 > URL: https://issues.apache.org/jira/browse/SPARK-23271 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Pavlo Z. >Assignee: Dilip Biswal >Priority: Minor > Fix For: 2.4.0 > > Attachments: parquet-empty-output.zip > > > Sophisticated case, reproduced only if read empty CSV file without header > with assigned schema. > Steps for reproduce (Scala): > {code:java} > val anySchema = StructType(StructField("anyName", StringType, nullable = > false) :: Nil) > val inputDF = spark.read.schema(anySchema).csv(inputFolderWithEmptyCSVFile) > inputDF.write.parquet(outputFolderName) > // Exception: org.apache.spark.sql.AnalysisException: Unable to infer schema > for Parquet. It must be specified manually.; > val actualDF = spark.read.parquet(outputFolderName) > > {code} > *Actual:* Only "_SUCCESS" file in output directory > *Expected*: at least one Parquet file with schema. > Project for reproduce is attached. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21030) extend hint syntax to support any expression for Python and R
[ https://issues.apache.org/jira/browse/SPARK-21030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392049#comment-16392049 ] Dylan Guedes commented on SPARK-21030: -- So, I started, and here is my progress: [https://github.com/DylanGuedes/spark/commit/433c622ae987f2b6e2a9a5bc97a0addc0d938d4b] Could anyone give me a feedback/hints before I open the PR? > extend hint syntax to support any expression for Python and R > - > > Key: SPARK-21030 > URL: https://issues.apache.org/jira/browse/SPARK-21030 > Project: Spark > Issue Type: Improvement > Components: PySpark, SparkR, SQL >Affects Versions: 2.2.0 >Reporter: Felix Cheung >Priority: Major > > See SPARK-20854 > we need to relax checks in > https://github.com/apache/spark/blob/6cbc61d1070584ffbc34b1f53df352c9162f414a/python/pyspark/sql/dataframe.py#L422 > and > https://github.com/apache/spark/blob/7f203a248f94df6183a4bc4642a3d873171fef29/R/pkg/R/DataFrame.R#L3746 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23325) DataSourceV2 readers should always produce InternalRow.
[ https://issues.apache.org/jira/browse/SPARK-23325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392048#comment-16392048 ] Wenchen Fan commented on SPARK-23325: - It's hard to stabilize the binary format like `UnsafeRow` and `UnsafeArrayData`. We can skip them as most data sources won't use them because they are hard to write. > DataSourceV2 readers should always produce InternalRow. > --- > > Key: SPARK-23325 > URL: https://issues.apache.org/jira/browse/SPARK-23325 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ryan Blue >Priority: Major > > DataSourceV2 row-oriented implementations are limited to producing either > {{Row}} instances or {{UnsafeRow}} instances by implementing > {{SupportsScanUnsafeRow}}. Instead, I think that implementations should > always produce {{InternalRow}}. > The problem with the choice between {{Row}} and {{UnsafeRow}} is that neither > one is appropriate for implementers. > File formats don't produce {{Row}} instances or the data values used by > {{Row}}, like {{java.sql.Timestamp}} and {{java.sql.Date}}. An implementation > that uses {{Row}} instances must produce data that is immediately translated > from the representation that was just produced by Spark. In my experience, it > made little sense to translate a timestamp in microseconds to a > (milliseconds, nanoseconds) pair, create a {{Timestamp}} instance, and pass > that instance to Spark for immediate translation back. > On the other hand, {{UnsafeRow}} is very difficult to produce unless data is > already held in memory. Even the Parquet support built into Spark > deserializes to {{InternalRow}} and then uses {{UnsafeProjection}} to produce > unsafe rows. When I went to build an implementation that deserializes Parquet > or Avro directly to {{UnsafeRow}} (I tried both), I found that it couldn't be > done without first deserializing into memory because the size of an array > must be known before any values are written. > I ended up deciding to deserialize to {{InternalRow}} and use > {{UnsafeProjection}} to convert to unsafe. There are two problems with this: > first, this is Scala and was difficult to call from Java (it required > reflection), and second, this causes double projection in the physical plan > (a copy for unsafe to unsafe) if there is a projection that wasn't fully > pushed to the data source. > I think the solution is to have a single interface for readers that expects > {{InternalRow}}. Then, a projection should be added in the Spark plan to > convert to unsafe and avoid projection in the plan and in the data source. If > the data source already produces unsafe rows by deserializing directly, this > still minimizes the number of copies because the unsafe projection will check > whether the incoming data is already {{UnsafeRow}}. > Using {{InternalRow}} would also match the interface on the write side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-21030) extend hint syntax to support any expression for Python and R
[ https://issues.apache.org/jira/browse/SPARK-21030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392049#comment-16392049 ] Dylan Guedes edited comment on SPARK-21030 at 3/8/18 10:50 PM: --- So, I started, and here is my progress: [https://github.com/DylanGuedes/spark/commit/433c622ae987f2b6e2a9a5bc97a0addc0d938d4b] Could anyone give me a feedback/hints before I open the PR? I'm not sure if my approach is correct. was (Author: dylanguedes): So, I started, and here is my progress: [https://github.com/DylanGuedes/spark/commit/433c622ae987f2b6e2a9a5bc97a0addc0d938d4b] Could anyone give me a feedback/hints before I open the PR? > extend hint syntax to support any expression for Python and R > - > > Key: SPARK-21030 > URL: https://issues.apache.org/jira/browse/SPARK-21030 > Project: Spark > Issue Type: Improvement > Components: PySpark, SparkR, SQL >Affects Versions: 2.2.0 >Reporter: Felix Cheung >Priority: Major > > See SPARK-20854 > we need to relax checks in > https://github.com/apache/spark/blob/6cbc61d1070584ffbc34b1f53df352c9162f414a/python/pyspark/sql/dataframe.py#L422 > and > https://github.com/apache/spark/blob/7f203a248f94df6183a4bc4642a3d873171fef29/R/pkg/R/DataFrame.R#L3746 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23325) DataSourceV2 readers should always produce InternalRow.
[ https://issues.apache.org/jira/browse/SPARK-23325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392046#comment-16392046 ] Wenchen Fan commented on SPARK-23325: - I think it's mostly document work. We need to add document for all the data classes, like `UTF8String`, `CalendarInterval`, `ArrayData`, `MapData`, etc. Also we need to put the data definition of all the data types in `InternalRow`, like timestamp type is a long representing microseconds from Unix epoch. We should create an umbrella Jira and parallel these work. > DataSourceV2 readers should always produce InternalRow. > --- > > Key: SPARK-23325 > URL: https://issues.apache.org/jira/browse/SPARK-23325 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ryan Blue >Priority: Major > > DataSourceV2 row-oriented implementations are limited to producing either > {{Row}} instances or {{UnsafeRow}} instances by implementing > {{SupportsScanUnsafeRow}}. Instead, I think that implementations should > always produce {{InternalRow}}. > The problem with the choice between {{Row}} and {{UnsafeRow}} is that neither > one is appropriate for implementers. > File formats don't produce {{Row}} instances or the data values used by > {{Row}}, like {{java.sql.Timestamp}} and {{java.sql.Date}}. An implementation > that uses {{Row}} instances must produce data that is immediately translated > from the representation that was just produced by Spark. In my experience, it > made little sense to translate a timestamp in microseconds to a > (milliseconds, nanoseconds) pair, create a {{Timestamp}} instance, and pass > that instance to Spark for immediate translation back. > On the other hand, {{UnsafeRow}} is very difficult to produce unless data is > already held in memory. Even the Parquet support built into Spark > deserializes to {{InternalRow}} and then uses {{UnsafeProjection}} to produce > unsafe rows. When I went to build an implementation that deserializes Parquet > or Avro directly to {{UnsafeRow}} (I tried both), I found that it couldn't be > done without first deserializing into memory because the size of an array > must be known before any values are written. > I ended up deciding to deserialize to {{InternalRow}} and use > {{UnsafeProjection}} to convert to unsafe. There are two problems with this: > first, this is Scala and was difficult to call from Java (it required > reflection), and second, this causes double projection in the physical plan > (a copy for unsafe to unsafe) if there is a projection that wasn't fully > pushed to the data source. > I think the solution is to have a single interface for readers that expects > {{InternalRow}}. Then, a projection should be added in the Spark plan to > convert to unsafe and avoid projection in the plan and in the data source. If > the data source already produces unsafe rows by deserializing directly, this > still minimizes the number of copies because the unsafe projection will check > whether the incoming data is already {{UnsafeRow}}. > Using {{InternalRow}} would also match the interface on the write side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23615) Add maxDF Parameter to Python CountVectorizer
[ https://issues.apache.org/jira/browse/SPARK-23615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23615: Assignee: Apache Spark > Add maxDF Parameter to Python CountVectorizer > - > > Key: SPARK-23615 > URL: https://issues.apache.org/jira/browse/SPARK-23615 > Project: Spark > Issue Type: Improvement > Components: ML, PySpark >Affects Versions: 2.4.0 >Reporter: Bryan Cutler >Assignee: Apache Spark >Priority: Minor > > The maxDF parameter is for filtering out frequently occurring terms. This > param was recently added to the Scala CountVectorizer and needs to be added > to Python also. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23615) Add maxDF Parameter to Python CountVectorizer
[ https://issues.apache.org/jira/browse/SPARK-23615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23615: Assignee: (was: Apache Spark) > Add maxDF Parameter to Python CountVectorizer > - > > Key: SPARK-23615 > URL: https://issues.apache.org/jira/browse/SPARK-23615 > Project: Spark > Issue Type: Improvement > Components: ML, PySpark >Affects Versions: 2.4.0 >Reporter: Bryan Cutler >Priority: Minor > > The maxDF parameter is for filtering out frequently occurring terms. This > param was recently added to the Scala CountVectorizer and needs to be added > to Python also. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23615) Add maxDF Parameter to Python CountVectorizer
[ https://issues.apache.org/jira/browse/SPARK-23615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392029#comment-16392029 ] Apache Spark commented on SPARK-23615: -- User 'huaxingao' has created a pull request for this issue: https://github.com/apache/spark/pull/20777 > Add maxDF Parameter to Python CountVectorizer > - > > Key: SPARK-23615 > URL: https://issues.apache.org/jira/browse/SPARK-23615 > Project: Spark > Issue Type: Improvement > Components: ML, PySpark >Affects Versions: 2.4.0 >Reporter: Bryan Cutler >Priority: Minor > > The maxDF parameter is for filtering out frequently occurring terms. This > param was recently added to the Scala CountVectorizer and needs to be added > to Python also. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23633) Update Pandas UDFs section in sql-programming-guide
Li Jin created SPARK-23633: -- Summary: Update Pandas UDFs section in sql-programming-guide Key: SPARK-23633 URL: https://issues.apache.org/jira/browse/SPARK-23633 Project: Spark Issue Type: Sub-task Components: PySpark Affects Versions: 2.4.0 Reporter: Li Jin Fix For: 2.4.0 Let's make sure sql-programming-guide is up-to-date before 2.4 release. https://github.com/apache/spark/blob/master/docs/sql-programming-guide.md#pandas-udfs-aka-vectorized-udfs -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23632) sparkR.session() error with spark packages - JVM is not ready after 10 seconds
Jaehyeon Kim created SPARK-23632: Summary: sparkR.session() error with spark packages - JVM is not ready after 10 seconds Key: SPARK-23632 URL: https://issues.apache.org/jira/browse/SPARK-23632 Project: Spark Issue Type: Bug Components: SparkR Affects Versions: 2.3.0, 2.2.1, 2.2.0 Reporter: Jaehyeon Kim Hi When I execute _sparkR.session()_ with _org.apache.hadoop:hadoop-aws:2.8.2_ as following, {code:java} library(SparkR, lib.loc=file.path(Sys.getenv('SPARK_HOME'),'R', 'lib')) ext_opts <- '-Dhttp.proxyHost=10.74.1.25 -Dhttp.proxyPort=8080 -Dhttps.proxyHost=10.74.1.25 -Dhttps.proxyPort=8080' sparkR.session(master = "spark://master:7077", appName = 'ml demo', sparkConfig = list(spark.driver.memory = '2g'), sparkPackages = 'org.apache.hadoop:hadoop-aws:2.8.2', spark.driver.extraJavaOptions = ext_opts) {code} I see *JVM is not ready after 10 seconds* error. Below shows some of the log messages. {code:java} Ivy Default Cache set to: /home/rstudio/.ivy2/cache The jars for the packages stored in: /home/rstudio/.ivy2/jars :: loading settings :: url = jar:file:/usr/local/spark-2.2.1/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml org.apache.hadoop#hadoop-aws added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 confs: [default] found org.apache.hadoop#hadoop-aws;2.8.2 in central ... ... found javax.servlet.jsp#jsp-api;2.1 in central Error in sparkR.sparkContext(master, appName, sparkHome, sparkConfigMap, : JVM is not ready after 10 seconds ... ... found joda-time#joda-time;2.9.4 in central downloading https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.2/hadoop-aws-2.8.2.jar ... ... ... xmlenc#xmlenc;0.52 from central in [default] - | |modules|| artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| - | default | 76 | 76 | 76 | 0 || 76 | 76 | - :: retrieving :: org.apache.spark#spark-submit-parent confs: [default] 76 artifacts copied, 0 already retrieved (27334kB/56ms) {code} It's fine if I re-execute it after the package and its dependencies are downloaded. I consider it's because of this part - https://github.com/apache/spark/blob/master/R/pkg/R/sparkR.R#L181 {code:java} if (!file.exists(path)) { stop("JVM is not ready after 10 seconds") } {code} Just wonder if it may be possible to update so that a user can determine how much to wait? Thanks. Regards Jaehyeon -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23630) Spark-on-YARN missing user customizations of hadoop config
[ https://issues.apache.org/jira/browse/SPARK-23630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23630: Assignee: (was: Apache Spark) > Spark-on-YARN missing user customizations of hadoop config > -- > > Key: SPARK-23630 > URL: https://issues.apache.org/jira/browse/SPARK-23630 > Project: Spark > Issue Type: Bug > Components: YARN >Affects Versions: 2.3.0 >Reporter: Marcelo Vanzin >Priority: Major > > In my change to fix SPARK-22372, I removed some code that allowed user > customizations of hadoop configs to take effect. Notably, these lines in > YarnSparkHadoopUtil: > {noformat} > override def newConfiguration(conf: SparkConf): Configuration = { >val hadoopConf = new YarnConfiguration(super.newConfiguration(conf)) >hadoopConf.addResource(Client.SPARK_HADOOP_CONF_FILE) >hadoopConf > } > {noformat} > The {{addResource}} call is now missing so user configs are ignored (only the > NM config is used). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23630) Spark-on-YARN missing user customizations of hadoop config
[ https://issues.apache.org/jira/browse/SPARK-23630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391938#comment-16391938 ] Apache Spark commented on SPARK-23630: -- User 'vanzin' has created a pull request for this issue: https://github.com/apache/spark/pull/20776 > Spark-on-YARN missing user customizations of hadoop config > -- > > Key: SPARK-23630 > URL: https://issues.apache.org/jira/browse/SPARK-23630 > Project: Spark > Issue Type: Bug > Components: YARN >Affects Versions: 2.3.0 >Reporter: Marcelo Vanzin >Priority: Major > > In my change to fix SPARK-22372, I removed some code that allowed user > customizations of hadoop configs to take effect. Notably, these lines in > YarnSparkHadoopUtil: > {noformat} > override def newConfiguration(conf: SparkConf): Configuration = { >val hadoopConf = new YarnConfiguration(super.newConfiguration(conf)) >hadoopConf.addResource(Client.SPARK_HADOOP_CONF_FILE) >hadoopConf > } > {noformat} > The {{addResource}} call is now missing so user configs are ignored (only the > NM config is used). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23630) Spark-on-YARN missing user customizations of hadoop config
[ https://issues.apache.org/jira/browse/SPARK-23630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23630: Assignee: Apache Spark > Spark-on-YARN missing user customizations of hadoop config > -- > > Key: SPARK-23630 > URL: https://issues.apache.org/jira/browse/SPARK-23630 > Project: Spark > Issue Type: Bug > Components: YARN >Affects Versions: 2.3.0 >Reporter: Marcelo Vanzin >Assignee: Apache Spark >Priority: Major > > In my change to fix SPARK-22372, I removed some code that allowed user > customizations of hadoop configs to take effect. Notably, these lines in > YarnSparkHadoopUtil: > {noformat} > override def newConfiguration(conf: SparkConf): Configuration = { >val hadoopConf = new YarnConfiguration(super.newConfiguration(conf)) >hadoopConf.addResource(Client.SPARK_HADOOP_CONF_FILE) >hadoopConf > } > {noformat} > The {{addResource}} call is now missing so user configs are ignored (only the > NM config is used). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23630) Spark-on-YARN missing user customizations of hadoop config
[ https://issues.apache.org/jira/browse/SPARK-23630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marcelo Vanzin updated SPARK-23630: --- Description: In my change to fix SPARK-22372, I removed some code that allowed user customizations of hadoop configs to take effect. Notably, these lines in YarnSparkHadoopUtil: {noformat} override def newConfiguration(conf: SparkConf): Configuration = { val hadoopConf = new YarnConfiguration(super.newConfiguration(conf)) hadoopConf.addResource(Client.SPARK_HADOOP_CONF_FILE) hadoopConf } {noformat} The {{addResource}} call is now missing so user configs are ignored (only the NM config is used). was: In my change to fix SPARK-22372, I removed some code that allowed user customizations of hadoop configs to take effect. Notably, these lines in YarnSparkHadoopUtil: {noformat} override def newConfiguration(conf: SparkConf): Configuration = { val hadoopConf = new YarnConfiguration(super.newConfiguration(conf)) hadoopConf.addResource(Client.SPARK_HADOOP_CONF_FILE) hadoopConf } {noformat} The {{addResource}} call is now missing to user configs are ignored (only the NM config is used). > Spark-on-YARN missing user customizations of hadoop config > -- > > Key: SPARK-23630 > URL: https://issues.apache.org/jira/browse/SPARK-23630 > Project: Spark > Issue Type: Bug > Components: YARN >Affects Versions: 2.3.0 >Reporter: Marcelo Vanzin >Priority: Major > > In my change to fix SPARK-22372, I removed some code that allowed user > customizations of hadoop configs to take effect. Notably, these lines in > YarnSparkHadoopUtil: > {noformat} > override def newConfiguration(conf: SparkConf): Configuration = { >val hadoopConf = new YarnConfiguration(super.newConfiguration(conf)) >hadoopConf.addResource(Client.SPARK_HADOOP_CONF_FILE) >hadoopConf > } > {noformat} > The {{addResource}} call is now missing so user configs are ignored (only the > NM config is used). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-23602) PrintToStderr should behave the same in interpreted mode
[ https://issues.apache.org/jira/browse/SPARK-23602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Herman van Hovell resolved SPARK-23602. --- Resolution: Fixed Assignee: Marco Gaido Fix Version/s: 2.4.0 > PrintToStderr should behave the same in interpreted mode > > > Key: SPARK-23602 > URL: https://issues.apache.org/jira/browse/SPARK-23602 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Herman van Hovell >Assignee: Marco Gaido >Priority: Trivial > Fix For: 2.4.0 > > > The {{PrintToStderr}} behaves differently for the interpreted and code > generated code paths. We should fix this. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23549) Spark SQL unexpected behavior when comparing timestamp to date
[ https://issues.apache.org/jira/browse/SPARK-23549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391877#comment-16391877 ] Dongjoon Hyun commented on SPARK-23549: --- Thank you for reporting and making a patch for this, [~djiangxu] and [~kiszk]. I checked Hive and saw that Hive 1.2~2.1 also had this issue and it's fixed at Hive 2.0.0. > Spark SQL unexpected behavior when comparing timestamp to date > -- > > Key: SPARK-23549 > URL: https://issues.apache.org/jira/browse/SPARK-23549 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.3, 2.0.2, 2.1.2, 2.2.1, 2.3.0 >Reporter: Dong Jiang >Priority: Major > > {code:java} > scala> spark.version > res1: String = 2.2.1 > scala> spark.sql("select cast('2017-03-01 00:00:00' as timestamp) between > cast('2017-02-28' as date) and cast('2017-03-01' as date)").show > +---+ > |((CAST(CAST(2017-03-01 00:00:00 AS TIMESTAMP) AS STRING) >= > CAST(CAST(2017-02-28 AS DATE) AS STRING)) AND (CAST(CAST(2017-03-01 00:00:00 > AS TIMESTAMP) AS STRING) <= CAST(CAST(2017-03-01 AS DATE) AS STRING)))| > +---+ > | > > false| > +---+{code} > As shown above, when a timestamp is compared to date in SparkSQL, both > timestamp and date are downcast to string, and leading to unexpected result. > If run the same SQL in presto/Athena, I got the expected result > {code:java} > select cast('2017-03-01 00:00:00' as timestamp) between cast('2017-02-28' as > date) and cast('2017-03-01' as date) > _col0 > 1 true > {code} > Is this a bug for Spark or a feature? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23631) Add summary to RandomForestClassificationModel
Evan Zamir created SPARK-23631: -- Summary: Add summary to RandomForestClassificationModel Key: SPARK-23631 URL: https://issues.apache.org/jira/browse/SPARK-23631 Project: Spark Issue Type: New Feature Components: ML Affects Versions: 2.3.0 Reporter: Evan Zamir I'm using the RandomForestClassificationModel and noticed that there is no summary attribute like there is for LogisticRegressionModel. Specifically, I'd like to have the roc and pr curves. Is that on the Spark roadmap anywhere? Is there a reason it hasn't been implemented? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23549) Spark SQL unexpected behavior when comparing timestamp to date
[ https://issues.apache.org/jira/browse/SPARK-23549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-23549: -- Affects Version/s: 1.6.3 > Spark SQL unexpected behavior when comparing timestamp to date > -- > > Key: SPARK-23549 > URL: https://issues.apache.org/jira/browse/SPARK-23549 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.3, 2.0.2, 2.1.2, 2.2.1, 2.3.0 >Reporter: Dong Jiang >Priority: Major > > {code:java} > scala> spark.version > res1: String = 2.2.1 > scala> spark.sql("select cast('2017-03-01 00:00:00' as timestamp) between > cast('2017-02-28' as date) and cast('2017-03-01' as date)").show > +---+ > |((CAST(CAST(2017-03-01 00:00:00 AS TIMESTAMP) AS STRING) >= > CAST(CAST(2017-02-28 AS DATE) AS STRING)) AND (CAST(CAST(2017-03-01 00:00:00 > AS TIMESTAMP) AS STRING) <= CAST(CAST(2017-03-01 AS DATE) AS STRING)))| > +---+ > | > > false| > +---+{code} > As shown above, when a timestamp is compared to date in SparkSQL, both > timestamp and date are downcast to string, and leading to unexpected result. > If run the same SQL in presto/Athena, I got the expected result > {code:java} > select cast('2017-03-01 00:00:00' as timestamp) between cast('2017-02-28' as > date) and cast('2017-03-01' as date) > _col0 > 1 true > {code} > Is this a bug for Spark or a feature? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23549) Spark SQL unexpected behavior when comparing timestamp to date
[ https://issues.apache.org/jira/browse/SPARK-23549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-23549: -- Affects Version/s: 2.0.2 > Spark SQL unexpected behavior when comparing timestamp to date > -- > > Key: SPARK-23549 > URL: https://issues.apache.org/jira/browse/SPARK-23549 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2, 2.1.2, 2.2.1, 2.3.0 >Reporter: Dong Jiang >Priority: Major > > {code:java} > scala> spark.version > res1: String = 2.2.1 > scala> spark.sql("select cast('2017-03-01 00:00:00' as timestamp) between > cast('2017-02-28' as date) and cast('2017-03-01' as date)").show > +---+ > |((CAST(CAST(2017-03-01 00:00:00 AS TIMESTAMP) AS STRING) >= > CAST(CAST(2017-02-28 AS DATE) AS STRING)) AND (CAST(CAST(2017-03-01 00:00:00 > AS TIMESTAMP) AS STRING) <= CAST(CAST(2017-03-01 AS DATE) AS STRING)))| > +---+ > | > > false| > +---+{code} > As shown above, when a timestamp is compared to date in SparkSQL, both > timestamp and date are downcast to string, and leading to unexpected result. > If run the same SQL in presto/Athena, I got the expected result > {code:java} > select cast('2017-03-01 00:00:00' as timestamp) between cast('2017-02-28' as > date) and cast('2017-03-01' as date) > _col0 > 1 true > {code} > Is this a bug for Spark or a feature? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23549) Spark SQL unexpected behavior when comparing timestamp to date
[ https://issues.apache.org/jira/browse/SPARK-23549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-23549: -- Affects Version/s: 2.1.2 > Spark SQL unexpected behavior when comparing timestamp to date > -- > > Key: SPARK-23549 > URL: https://issues.apache.org/jira/browse/SPARK-23549 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.2, 2.2.1, 2.3.0 >Reporter: Dong Jiang >Priority: Major > > {code:java} > scala> spark.version > res1: String = 2.2.1 > scala> spark.sql("select cast('2017-03-01 00:00:00' as timestamp) between > cast('2017-02-28' as date) and cast('2017-03-01' as date)").show > +---+ > |((CAST(CAST(2017-03-01 00:00:00 AS TIMESTAMP) AS STRING) >= > CAST(CAST(2017-02-28 AS DATE) AS STRING)) AND (CAST(CAST(2017-03-01 00:00:00 > AS TIMESTAMP) AS STRING) <= CAST(CAST(2017-03-01 AS DATE) AS STRING)))| > +---+ > | > > false| > +---+{code} > As shown above, when a timestamp is compared to date in SparkSQL, both > timestamp and date are downcast to string, and leading to unexpected result. > If run the same SQL in presto/Athena, I got the expected result > {code:java} > select cast('2017-03-01 00:00:00' as timestamp) between cast('2017-02-28' as > date) and cast('2017-03-01' as date) > _col0 > 1 true > {code} > Is this a bug for Spark or a feature? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23549) Spark SQL unexpected behavior when comparing timestamp to date
[ https://issues.apache.org/jira/browse/SPARK-23549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-23549: -- Affects Version/s: 2.3.0 > Spark SQL unexpected behavior when comparing timestamp to date > -- > > Key: SPARK-23549 > URL: https://issues.apache.org/jira/browse/SPARK-23549 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1, 2.3.0 >Reporter: Dong Jiang >Priority: Major > > {code:java} > scala> spark.version > res1: String = 2.2.1 > scala> spark.sql("select cast('2017-03-01 00:00:00' as timestamp) between > cast('2017-02-28' as date) and cast('2017-03-01' as date)").show > +---+ > |((CAST(CAST(2017-03-01 00:00:00 AS TIMESTAMP) AS STRING) >= > CAST(CAST(2017-02-28 AS DATE) AS STRING)) AND (CAST(CAST(2017-03-01 00:00:00 > AS TIMESTAMP) AS STRING) <= CAST(CAST(2017-03-01 AS DATE) AS STRING)))| > +---+ > | > > false| > +---+{code} > As shown above, when a timestamp is compared to date in SparkSQL, both > timestamp and date are downcast to string, and leading to unexpected result. > If run the same SQL in presto/Athena, I got the expected result > {code:java} > select cast('2017-03-01 00:00:00' as timestamp) between cast('2017-02-28' as > date) and cast('2017-03-01' as date) > _col0 > 1 true > {code} > Is this a bug for Spark or a feature? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23325) DataSourceV2 readers should always produce InternalRow.
[ https://issues.apache.org/jira/browse/SPARK-23325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391861#comment-16391861 ] Michael Armbrust commented on SPARK-23325: -- It does seem like it would be that hard to stabilize at least the generic form of InternalRow or am I missing something? > DataSourceV2 readers should always produce InternalRow. > --- > > Key: SPARK-23325 > URL: https://issues.apache.org/jira/browse/SPARK-23325 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ryan Blue >Priority: Major > > DataSourceV2 row-oriented implementations are limited to producing either > {{Row}} instances or {{UnsafeRow}} instances by implementing > {{SupportsScanUnsafeRow}}. Instead, I think that implementations should > always produce {{InternalRow}}. > The problem with the choice between {{Row}} and {{UnsafeRow}} is that neither > one is appropriate for implementers. > File formats don't produce {{Row}} instances or the data values used by > {{Row}}, like {{java.sql.Timestamp}} and {{java.sql.Date}}. An implementation > that uses {{Row}} instances must produce data that is immediately translated > from the representation that was just produced by Spark. In my experience, it > made little sense to translate a timestamp in microseconds to a > (milliseconds, nanoseconds) pair, create a {{Timestamp}} instance, and pass > that instance to Spark for immediate translation back. > On the other hand, {{UnsafeRow}} is very difficult to produce unless data is > already held in memory. Even the Parquet support built into Spark > deserializes to {{InternalRow}} and then uses {{UnsafeProjection}} to produce > unsafe rows. When I went to build an implementation that deserializes Parquet > or Avro directly to {{UnsafeRow}} (I tried both), I found that it couldn't be > done without first deserializing into memory because the size of an array > must be known before any values are written. > I ended up deciding to deserialize to {{InternalRow}} and use > {{UnsafeProjection}} to convert to unsafe. There are two problems with this: > first, this is Scala and was difficult to call from Java (it required > reflection), and second, this causes double projection in the physical plan > (a copy for unsafe to unsafe) if there is a projection that wasn't fully > pushed to the data source. > I think the solution is to have a single interface for readers that expects > {{InternalRow}}. Then, a projection should be added in the Spark plan to > convert to unsafe and avoid projection in the plan and in the data source. If > the data source already produces unsafe rows by deserializing directly, this > still minimizes the number of copies because the unsafe projection will check > whether the incoming data is already {{UnsafeRow}}. > Using {{InternalRow}} would also match the interface on the write side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec
[ https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391834#comment-16391834 ] Kazuaki Ishizaki commented on SPARK-23598: -- Thanks, I confirmed that I can reproduce this issue with the master. > WholeStageCodegen can lead to IllegalAccessError calling append for > HashAggregateExec > -- > > Key: SPARK-23598 > URL: https://issues.apache.org/jira/browse/SPARK-23598 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: David Vogelbacher >Priority: Major > > Got the following stacktrace for a large QueryPlan using WholeStageCodeGen: > {noformat} > java.lang.IllegalAccessError: tried to access method > org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V > from class > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345){noformat} > After disabling codegen, everything works. > The root cause seems to be that we are trying to call the protected _append_ > method of > [BufferedRowIterator|https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java#L68] > from an inner-class of a sub-class that is loaded by a different > class-loader (after codegen compilation). > [https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-5.html#jvms-5.4.4] > states that a protected method _R_ can be accessed only if one of the > following two conditions is fulfilled: > # R is protected and is declared in a class C, and D is either a subclass of > C or C itself. Furthermore, if R is not static, then the symbolic reference > to R must contain a symbolic reference to a class T, such that T is either a > subclass of D, a superclass of D, or D itself. > # R is either protected or has default access (that is, neither public nor > protected nor private), and is declared by a class in the same run-time > package as D. > 2.) doesn't apply as we have loaded the class with a different class loader > (and are in a different package) and 1.) doesn't apply because we are > apparently trying to call the method from an inner class of a subclass of > _BufferedRowIterator_. > Looking at the Code path of _WholeStageCodeGen_, the following happens: > # In > [WholeStageCodeGen|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L527], > we create the subclass of _BufferedRowIterator_, along with a _processNext_ > method for processing the output of the child plan. > # In the child, which is a > [HashAggregateExec|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L517], > we create the method which shows up at the top of the stack trace (called > _doAggregateWithKeysOutput_ ) > # We add this method to the compiled code invoking _addNewFunction_ of > [CodeGenerator|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L460] > In the generated function body we call the _append_ method.| > Now, the _addNewFunction_ method states that: > {noformat} > If the code for the `OuterClass` grows too large, the function will be > inlined into a new private, inner class > {noformat} > This indeed seems to happen: the _doAggregateWithKeysOutput_ method is put > into a new private inner class. Thus, it doesn't have access to the protected > _append_ method anymore but still tries to call it, which results in the > _IllegalAccessError._ > Possible fixes: > * Pass in the _inlineToOuterClass_ flag when invoking the
[jira] [Commented] (SPARK-23325) DataSourceV2 readers should always produce InternalRow.
[ https://issues.apache.org/jira/browse/SPARK-23325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391768#comment-16391768 ] Ryan Blue commented on SPARK-23325: --- By exposing an interface that uses UnsafeRow, don't we already have this problem? The only difference is that UnsafeRow is harder to produce. We also have a write interface in v2 that exposes InternalRow. I think now is the time to start documenting these so we can officially support InternalRow instead of effectively supporting InternalRow. UnsafeRow would benefit from more documentation, too. To find out how to use the write interfaces, I ended up using EXPLAIN CODEGEN on a bunch of different queries and looking at the results, then inspecting the writers to find out the in-memory representation. As for the columnar format, I see that as a nice-to-have. The v2 API is based on rows for a good reason, and we need to document and support that row format. Unless we are going to change v2 to a columnar API, stabilizing and documenting the columnar format doesn't help much. What work needs to be done here to make InternalRow viable? If it is to document the values used to internally represent different types, I can help out with that. I already have matching representations documented in the Iceberg spec anyway. > DataSourceV2 readers should always produce InternalRow. > --- > > Key: SPARK-23325 > URL: https://issues.apache.org/jira/browse/SPARK-23325 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ryan Blue >Priority: Major > > DataSourceV2 row-oriented implementations are limited to producing either > {{Row}} instances or {{UnsafeRow}} instances by implementing > {{SupportsScanUnsafeRow}}. Instead, I think that implementations should > always produce {{InternalRow}}. > The problem with the choice between {{Row}} and {{UnsafeRow}} is that neither > one is appropriate for implementers. > File formats don't produce {{Row}} instances or the data values used by > {{Row}}, like {{java.sql.Timestamp}} and {{java.sql.Date}}. An implementation > that uses {{Row}} instances must produce data that is immediately translated > from the representation that was just produced by Spark. In my experience, it > made little sense to translate a timestamp in microseconds to a > (milliseconds, nanoseconds) pair, create a {{Timestamp}} instance, and pass > that instance to Spark for immediate translation back. > On the other hand, {{UnsafeRow}} is very difficult to produce unless data is > already held in memory. Even the Parquet support built into Spark > deserializes to {{InternalRow}} and then uses {{UnsafeProjection}} to produce > unsafe rows. When I went to build an implementation that deserializes Parquet > or Avro directly to {{UnsafeRow}} (I tried both), I found that it couldn't be > done without first deserializing into memory because the size of an array > must be known before any values are written. > I ended up deciding to deserialize to {{InternalRow}} and use > {{UnsafeProjection}} to convert to unsafe. There are two problems with this: > first, this is Scala and was difficult to call from Java (it required > reflection), and second, this causes double projection in the physical plan > (a copy for unsafe to unsafe) if there is a projection that wasn't fully > pushed to the data source. > I think the solution is to have a single interface for readers that expects > {{InternalRow}}. Then, a projection should be added in the Spark plan to > convert to unsafe and avoid projection in the plan and in the data source. If > the data source already produces unsafe rows by deserializing directly, this > still minimizes the number of copies because the unsafe projection will check > whether the incoming data is already {{UnsafeRow}}. > Using {{InternalRow}} would also match the interface on the write side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23630) Spark-on-YARN missing user customizations of hadoop config
Marcelo Vanzin created SPARK-23630: -- Summary: Spark-on-YARN missing user customizations of hadoop config Key: SPARK-23630 URL: https://issues.apache.org/jira/browse/SPARK-23630 Project: Spark Issue Type: Bug Components: YARN Affects Versions: 2.3.0 Reporter: Marcelo Vanzin In my change to fix SPARK-22372, I removed some code that allowed user customizations of hadoop configs to take effect. Notably, these lines in YarnSparkHadoopUtil: {noformat} override def newConfiguration(conf: SparkConf): Configuration = { val hadoopConf = new YarnConfiguration(super.newConfiguration(conf)) hadoopConf.addResource(Client.SPARK_HADOOP_CONF_FILE) hadoopConf } {noformat} The {{addResource}} call is now missing to user configs are ignored (only the NM config is used). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14681) Provide label/impurity stats for spark.ml decision tree nodes
[ https://issues.apache.org/jira/browse/SPARK-14681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391762#comment-16391762 ] Joseph K. Bradley commented on SPARK-14681: --- [~WeichenXu123] Thanks for the PR! I'll comment on the design here in the JIRA. >From your PR: {code} class TreeClassifierStatInfo def getLabelCount(label: Int): Double class TreeRegressorStatInfo def getCount(): Double def getSum(): Double def getSquareSum(): Double class Node +++ def statInfo: TreeStatInfo trait TreeStatInfo def asTreeClassifierStatInfo: TreeClassifierStatInfo def asTreeRegressorStatInfo: TreeRegressorStatInfo {code} I have a few thoughts: * I like the overall approach of using classes instead of just returning plain double arrays. * This will require users to explicitly cast TreeStatInfo to the classifier/regressor type. Would it be possible to avoid that without breaking APIs, e.g., by having a ClassificationNode and a RegressionNode inheriting from Node? * Naming: What about using "Stats" or "Statistics" instead of "StatInfo?" I just feel the "Info" part is uninformative. > Provide label/impurity stats for spark.ml decision tree nodes > - > > Key: SPARK-14681 > URL: https://issues.apache.org/jira/browse/SPARK-14681 > Project: Spark > Issue Type: Improvement > Components: ML >Reporter: Joseph K. Bradley >Priority: Major > > Currently, spark.ml decision trees provide all node info except for the > aggregated stats about labels and impurities. This task is to provide those > publicly. We need to choose a good API for it, so we should discuss the > design on this issue before implementing it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec
[ https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391709#comment-16391709 ] David Vogelbacher edited comment on SPARK-23598 at 3/8/18 6:41 PM: --- [~mgaido] {{HashAggregateExec}} calls {{addNewFunction}}, which calls {{addNewFunctionInternal}} which uses that flag and checks if the current size is bigger than {{GENERATED_CLASS_SIZE_THRESHOLD}} ([see|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L478]) I just compiled develop with {{GENERATED_CLASS_SIZE_THRESHOLD}} set to -1 and was able to reproduce (cc [~hvanhovell]) . I applied the following diff before compiling: {noformat} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 793824b0b0..7fad817d89 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -1167,7 +1167,7 @@ object CodeGenerator extends Logging { // limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a // threshold of 1000k bytes to determine when a function should be inlined to a private, inner // class. - final val GENERATED_CLASS_SIZE_THRESHOLD = 100 + final val GENERATED_CLASS_SIZE_THRESHOLD = -1 // This is the threshold for the number of global variables, whose types are primitive type or // complex type (e.g. more than one-dimensional array), that will be placed at the outer class (END) {noformat} Then, I executed a simple groupBy-Aggregate in the spark-shell and got the same error: {noformat} ➜ spark git:(master) ✗ ./bin/spark-shell 18/03/08 18:30:24 WARN Utils: Your hostname, dvogelbac resolves to a loopback address: 127.0.0.1; using 10.111.11.111 instead (on interface en0) 18/03/08 18:30:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 18/03/08 18:30:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://10.111.11.111:4040 Spark context available as 'sc' (master = local[*], app id = local-1520533829643). Spark session available as 'spark'. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.0-SNAPSHOT /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_121) Type in expressions to have them evaluated. Type :help for more information. scala> spark.conf.set("spark.sql.codegen.wholeStage", true) scala> val df_pet_age = Seq( | (8, "bat"), | (5, "bat"), | (15, "bat"), | (30, "mouse"), | (15, "mouse"), | (23, "mouse"), | (8, "horse"), | (-5, "horse") | ).toDF("age", "name") df_pet_age: org.apache.spark.sql.DataFrame = [age: int, name: string] scala> df_pet_age.groupBy("name").avg("age").show() 18/03/08 18:31:20 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1) java.lang.IllegalAccessError: tried to access method org.apache.spark.sql.execution.BufferedRowIterator.stopEarly()Z from class org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1 at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1.agg_doAggregateWithKeys$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:616) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at
[jira] [Commented] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec
[ https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391709#comment-16391709 ] David Vogelbacher commented on SPARK-23598: --- [~mgaido] {{HashAggregateExec}} calls {{addNewFunction}}, which calls {{addNewFunctionInternal}} which uses that flag and checks if the current size is bigger than {{GENERATED_CLASS_SIZE_THRESHOLD}} ([see|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L478]) I just compiled develop with {{GENERATED_CLASS_SIZE_THRESHOLD}} set to -1 and was able to reproduce (cc [~hvanhovell]) . I applied the following diff before compiling: {noformat} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 793824b0b0..7fad817d89 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -1167,7 +1167,7 @@ object CodeGenerator extends Logging { // limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a // threshold of 1000k bytes to determine when a function should be inlined to a private, inner // class. - final val GENERATED_CLASS_SIZE_THRESHOLD = 100 + final val GENERATED_CLASS_SIZE_THRESHOLD = -1 // This is the threshold for the number of global variables, whose types are primitive type or // complex type (e.g. more than one-dimensional array), that will be placed at the outer class (END) {noformat} Then, I executed a simple groupBy-Aggregate in the spark-shell and got the same error: {noformat} ➜ spark git:(master) ✗ ./bin/spark-shell 18/03/08 18:30:24 WARN Utils: Your hostname, dvogelbac56-mac resolves to a loopback address: 127.0.0.1; using 10.224.86.161 instead (on interface en0) 18/03/08 18:30:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 18/03/08 18:30:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://10.224.86.161:4040 Spark context available as 'sc' (master = local[*], app id = local-1520533829643). Spark session available as 'spark'. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.0-SNAPSHOT /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_121) Type in expressions to have them evaluated. Type :help for more information. scala> spark.conf.set("spark.sql.codegen.wholeStage", true) scala> val df_pet_age = Seq( | (8, "bat"), | (5, "bat"), | (15, "bat"), | (30, "mouse"), | (15, "mouse"), | (23, "mouse"), | (8, "horse"), | (-5, "horse") | ).toDF("age", "name") df_pet_age: org.apache.spark.sql.DataFrame = [age: int, name: string] scala> df_pet_age.groupBy("name").avg("age").show() 18/03/08 18:31:20 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1) java.lang.IllegalAccessError: tried to access method org.apache.spark.sql.execution.BufferedRowIterator.stopEarly()Z from class org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1 at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1.agg_doAggregateWithKeys$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:616) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at
[jira] [Assigned] (SPARK-23549) Spark SQL unexpected behavior when comparing timestamp to date
[ https://issues.apache.org/jira/browse/SPARK-23549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23549: Assignee: (was: Apache Spark) > Spark SQL unexpected behavior when comparing timestamp to date > -- > > Key: SPARK-23549 > URL: https://issues.apache.org/jira/browse/SPARK-23549 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1 >Reporter: Dong Jiang >Priority: Major > > {code:java} > scala> spark.version > res1: String = 2.2.1 > scala> spark.sql("select cast('2017-03-01 00:00:00' as timestamp) between > cast('2017-02-28' as date) and cast('2017-03-01' as date)").show > +---+ > |((CAST(CAST(2017-03-01 00:00:00 AS TIMESTAMP) AS STRING) >= > CAST(CAST(2017-02-28 AS DATE) AS STRING)) AND (CAST(CAST(2017-03-01 00:00:00 > AS TIMESTAMP) AS STRING) <= CAST(CAST(2017-03-01 AS DATE) AS STRING)))| > +---+ > | > > false| > +---+{code} > As shown above, when a timestamp is compared to date in SparkSQL, both > timestamp and date are downcast to string, and leading to unexpected result. > If run the same SQL in presto/Athena, I got the expected result > {code:java} > select cast('2017-03-01 00:00:00' as timestamp) between cast('2017-02-28' as > date) and cast('2017-03-01' as date) > _col0 > 1 true > {code} > Is this a bug for Spark or a feature? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23549) Spark SQL unexpected behavior when comparing timestamp to date
[ https://issues.apache.org/jira/browse/SPARK-23549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23549: Assignee: Apache Spark > Spark SQL unexpected behavior when comparing timestamp to date > -- > > Key: SPARK-23549 > URL: https://issues.apache.org/jira/browse/SPARK-23549 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1 >Reporter: Dong Jiang >Assignee: Apache Spark >Priority: Major > > {code:java} > scala> spark.version > res1: String = 2.2.1 > scala> spark.sql("select cast('2017-03-01 00:00:00' as timestamp) between > cast('2017-02-28' as date) and cast('2017-03-01' as date)").show > +---+ > |((CAST(CAST(2017-03-01 00:00:00 AS TIMESTAMP) AS STRING) >= > CAST(CAST(2017-02-28 AS DATE) AS STRING)) AND (CAST(CAST(2017-03-01 00:00:00 > AS TIMESTAMP) AS STRING) <= CAST(CAST(2017-03-01 AS DATE) AS STRING)))| > +---+ > | > > false| > +---+{code} > As shown above, when a timestamp is compared to date in SparkSQL, both > timestamp and date are downcast to string, and leading to unexpected result. > If run the same SQL in presto/Athena, I got the expected result > {code:java} > select cast('2017-03-01 00:00:00' as timestamp) between cast('2017-02-28' as > date) and cast('2017-03-01' as date) > _col0 > 1 true > {code} > Is this a bug for Spark or a feature? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23549) Spark SQL unexpected behavior when comparing timestamp to date
[ https://issues.apache.org/jira/browse/SPARK-23549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391686#comment-16391686 ] Apache Spark commented on SPARK-23549: -- User 'kiszk' has created a pull request for this issue: https://github.com/apache/spark/pull/20774 > Spark SQL unexpected behavior when comparing timestamp to date > -- > > Key: SPARK-23549 > URL: https://issues.apache.org/jira/browse/SPARK-23549 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1 >Reporter: Dong Jiang >Priority: Major > > {code:java} > scala> spark.version > res1: String = 2.2.1 > scala> spark.sql("select cast('2017-03-01 00:00:00' as timestamp) between > cast('2017-02-28' as date) and cast('2017-03-01' as date)").show > +---+ > |((CAST(CAST(2017-03-01 00:00:00 AS TIMESTAMP) AS STRING) >= > CAST(CAST(2017-02-28 AS DATE) AS STRING)) AND (CAST(CAST(2017-03-01 00:00:00 > AS TIMESTAMP) AS STRING) <= CAST(CAST(2017-03-01 AS DATE) AS STRING)))| > +---+ > | > > false| > +---+{code} > As shown above, when a timestamp is compared to date in SparkSQL, both > timestamp and date are downcast to string, and leading to unexpected result. > If run the same SQL in presto/Athena, I got the expected result > {code:java} > select cast('2017-03-01 00:00:00' as timestamp) between cast('2017-02-28' as > date) and cast('2017-03-01' as date) > _col0 > 1 true > {code} > Is this a bug for Spark or a feature? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16630) Blacklist a node if executors won't launch on it.
[ https://issues.apache.org/jira/browse/SPARK-16630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391649#comment-16391649 ] Attila Zsolt Piros commented on SPARK-16630: I am working on this issue. > Blacklist a node if executors won't launch on it. > - > > Key: SPARK-16630 > URL: https://issues.apache.org/jira/browse/SPARK-16630 > Project: Spark > Issue Type: Improvement > Components: YARN >Affects Versions: 1.6.2 >Reporter: Thomas Graves >Priority: Major > > On YARN, its possible that a node is messed or misconfigured such that a > container won't launch on it. For instance if the Spark external shuffle > handler didn't get loaded on it , maybe its just some other hardware issue or > hadoop configuration issue. > It would be nice we could recognize this happening and stop trying to launch > executors on it since that could end up causing us to hit our max number of > executor failures and then kill the job. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23549) Spark SQL unexpected behavior when comparing timestamp to date
[ https://issues.apache.org/jira/browse/SPARK-23549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391386#comment-16391386 ] Kazuaki Ishizaki edited comment on SPARK-23549 at 3/8/18 5:33 PM: -- I see. Make sense. It would be good to cast `TimestampType` when we compare `DateType` with `TimestampType`. I will submit a PR. was (Author: kiszk): I see. Make sense. It would be good to cast `DateType` when we compare `DateType` with `TimestampType`. I will submit a PR. > Spark SQL unexpected behavior when comparing timestamp to date > -- > > Key: SPARK-23549 > URL: https://issues.apache.org/jira/browse/SPARK-23549 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1 >Reporter: Dong Jiang >Priority: Major > > {code:java} > scala> spark.version > res1: String = 2.2.1 > scala> spark.sql("select cast('2017-03-01 00:00:00' as timestamp) between > cast('2017-02-28' as date) and cast('2017-03-01' as date)").show > +---+ > |((CAST(CAST(2017-03-01 00:00:00 AS TIMESTAMP) AS STRING) >= > CAST(CAST(2017-02-28 AS DATE) AS STRING)) AND (CAST(CAST(2017-03-01 00:00:00 > AS TIMESTAMP) AS STRING) <= CAST(CAST(2017-03-01 AS DATE) AS STRING)))| > +---+ > | > > false| > +---+{code} > As shown above, when a timestamp is compared to date in SparkSQL, both > timestamp and date are downcast to string, and leading to unexpected result. > If run the same SQL in presto/Athena, I got the expected result > {code:java} > select cast('2017-03-01 00:00:00' as timestamp) between cast('2017-02-28' as > date) and cast('2017-03-01' as date) > _col0 > 1 true > {code} > Is this a bug for Spark or a feature? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23615) Add maxDF Parameter to Python CountVectorizer
[ https://issues.apache.org/jira/browse/SPARK-23615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391577#comment-16391577 ] Bryan Cutler commented on SPARK-23615: -- Sure, go ahead > Add maxDF Parameter to Python CountVectorizer > - > > Key: SPARK-23615 > URL: https://issues.apache.org/jira/browse/SPARK-23615 > Project: Spark > Issue Type: Improvement > Components: ML, PySpark >Affects Versions: 2.4.0 >Reporter: Bryan Cutler >Priority: Minor > > The maxDF parameter is for filtering out frequently occurring terms. This > param was recently added to the Scala CountVectorizer and needs to be added > to Python also. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18165) Kinesis support in Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-18165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391557#comment-16391557 ] Vikram Agrawal commented on SPARK-18165: [~gaurav24] - yeah I saw that. Nonetheless, I have spent enough time going through available Kinesis APIs and Structured Streaming Source Provider requirement to come up with this library. You can give it a try and share your feedbacks/suggestions. > Kinesis support in Structured Streaming > --- > > Key: SPARK-18165 > URL: https://issues.apache.org/jira/browse/SPARK-18165 > Project: Spark > Issue Type: New Feature > Components: DStreams >Reporter: Lauren Moos >Priority: Major > > Implement Kinesis based sources and sinks for Structured Streaming -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18165) Kinesis support in Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-18165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391501#comment-16391501 ] Gaurav Shah commented on SPARK-18165: - Databricks have it implemented not sure why is it exclusive for databricks customers only. https://docs.databricks.com/spark/latest/structured-streaming/kinesis.html > Kinesis support in Structured Streaming > --- > > Key: SPARK-18165 > URL: https://issues.apache.org/jira/browse/SPARK-18165 > Project: Spark > Issue Type: New Feature > Components: DStreams >Reporter: Lauren Moos >Priority: Major > > Implement Kinesis based sources and sinks for Structured Streaming -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23625) spark sql long-running mission will be dead
[ https://issues.apache.org/jira/browse/SPARK-23625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391447#comment-16391447 ] Yu Wang commented on SPARK-23625: - [~hvanhovell] thank you for your answer, but it doesn't happen frequently. > spark sql long-running mission will be dead > --- > > Key: SPARK-23625 > URL: https://issues.apache.org/jira/browse/SPARK-23625 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.2 >Reporter: Yu Wang >Priority: Major > Fix For: 1.6.2 > > Attachments: 1520489823.png, 1520489833.png, 1520489848.png, > 1520489854.png, 1520489861.png, 1520489867.png > > > spark sql long-running mission will be dead -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23629) Building streaming-kafka-0-8-assembly or streaming-flume-assembly adds incompatible jline jar to assembly
Bruce Robbins created SPARK-23629: - Summary: Building streaming-kafka-0-8-assembly or streaming-flume-assembly adds incompatible jline jar to assembly Key: SPARK-23629 URL: https://issues.apache.org/jira/browse/SPARK-23629 Project: Spark Issue Type: Bug Components: Build Affects Versions: 2.3.0 Reporter: Bruce Robbins When you build spark-streaming-flume-assembly or streaming-kafka-0-8-assembly (so that you can run the python tests), you get two versions of the jline jar in assembly: {noformat} bash-3.2$ find assembly -name "*jline*.jar" assembly/target/scala-2.11/jars/jline-0.9.94.jar assembly/target/scala-2.11/jars/jline-2.12.jar bash-3.2$ {noformat} This causes the org.apache.spark.sql.hive.thriftserver.CliSuite tests to fail: {noformat} [info] 2018-03-08 07:35:55.854 - stderr> [ERROR] Terminal initialization failed; falling back to unsupported [info] 2018-03-08 07:35:55.855 - stderr> java.lang.IncompatibleClassChangeError: Found class jline.Terminal, but interface was expected {noformat} It also causes spark-shell to throw an exception on start-up, but otherwise seems to work OK. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23549) Spark SQL unexpected behavior when comparing timestamp to date
[ https://issues.apache.org/jira/browse/SPARK-23549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391386#comment-16391386 ] Kazuaki Ishizaki commented on SPARK-23549: -- I see. Make sense. It would be good to cast `DateType` when we compare `DateType` with `TimestampType`. I will submit a PR. > Spark SQL unexpected behavior when comparing timestamp to date > -- > > Key: SPARK-23549 > URL: https://issues.apache.org/jira/browse/SPARK-23549 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1 >Reporter: Dong Jiang >Priority: Major > > {code:java} > scala> spark.version > res1: String = 2.2.1 > scala> spark.sql("select cast('2017-03-01 00:00:00' as timestamp) between > cast('2017-02-28' as date) and cast('2017-03-01' as date)").show > +---+ > |((CAST(CAST(2017-03-01 00:00:00 AS TIMESTAMP) AS STRING) >= > CAST(CAST(2017-02-28 AS DATE) AS STRING)) AND (CAST(CAST(2017-03-01 00:00:00 > AS TIMESTAMP) AS STRING) <= CAST(CAST(2017-03-01 AS DATE) AS STRING)))| > +---+ > | > > false| > +---+{code} > As shown above, when a timestamp is compared to date in SparkSQL, both > timestamp and date are downcast to string, and leading to unexpected result. > If run the same SQL in presto/Athena, I got the expected result > {code:java} > select cast('2017-03-01 00:00:00' as timestamp) between cast('2017-02-28' as > date) and cast('2017-03-01' as date) > _col0 > 1 true > {code} > Is this a bug for Spark or a feature? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23598) WholeStageCodegen can lead to IllegalAccessError calling append for HashAggregateExec
[ https://issues.apache.org/jira/browse/SPARK-23598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391373#comment-16391373 ] Marco Gaido commented on SPARK-23598: - [~dvogelbacher] the parameter you are talking about is taken in account only when we split expressions and it is not done in HashAggregateExec. I haven't been able to reproduce this. If you can provide a sample to reproduce this, it would be very helpful. > WholeStageCodegen can lead to IllegalAccessError calling append for > HashAggregateExec > -- > > Key: SPARK-23598 > URL: https://issues.apache.org/jira/browse/SPARK-23598 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: David Vogelbacher >Priority: Major > > Got the following stacktrace for a large QueryPlan using WholeStageCodeGen: > {noformat} > java.lang.IllegalAccessError: tried to access method > org.apache.spark.sql.execution.BufferedRowIterator.append(Lorg/apache/spark/sql/catalyst/InternalRow;)V > from class > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7$agg_NestedClass.agg_doAggregateWithKeysOutput$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345){noformat} > After disabling codegen, everything works. > The root cause seems to be that we are trying to call the protected _append_ > method of > [BufferedRowIterator|https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java#L68] > from an inner-class of a sub-class that is loaded by a different > class-loader (after codegen compilation). > [https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-5.html#jvms-5.4.4] > states that a protected method _R_ can be accessed only if one of the > following two conditions is fulfilled: > # R is protected and is declared in a class C, and D is either a subclass of > C or C itself. Furthermore, if R is not static, then the symbolic reference > to R must contain a symbolic reference to a class T, such that T is either a > subclass of D, a superclass of D, or D itself. > # R is either protected or has default access (that is, neither public nor > protected nor private), and is declared by a class in the same run-time > package as D. > 2.) doesn't apply as we have loaded the class with a different class loader > (and are in a different package) and 1.) doesn't apply because we are > apparently trying to call the method from an inner class of a subclass of > _BufferedRowIterator_. > Looking at the Code path of _WholeStageCodeGen_, the following happens: > # In > [WholeStageCodeGen|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L527], > we create the subclass of _BufferedRowIterator_, along with a _processNext_ > method for processing the output of the child plan. > # In the child, which is a > [HashAggregateExec|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L517], > we create the method which shows up at the top of the stack trace (called > _doAggregateWithKeysOutput_ ) > # We add this method to the compiled code invoking _addNewFunction_ of > [CodeGenerator|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L460] > In the generated function body we call the _append_ method.| > Now, the _addNewFunction_ method states that: > {noformat} > If the code for the `OuterClass` grows too large, the function will be > inlined into a new private, inner class > {noformat} > This indeed seems to happen: the _doAggregateWithKeysOutput_ method is put > into a new private inner class. Thus, it doesn't have access to the protected > _append_
[jira] [Assigned] (SPARK-23602) PrintToStderr should behave the same in interpreted mode
[ https://issues.apache.org/jira/browse/SPARK-23602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23602: Assignee: (was: Apache Spark) > PrintToStderr should behave the same in interpreted mode > > > Key: SPARK-23602 > URL: https://issues.apache.org/jira/browse/SPARK-23602 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Herman van Hovell >Priority: Trivial > > The {{PrintToStderr}} behaves differently for the interpreted and code > generated code paths. We should fix this. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23602) PrintToStderr should behave the same in interpreted mode
[ https://issues.apache.org/jira/browse/SPARK-23602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23602: Assignee: Apache Spark > PrintToStderr should behave the same in interpreted mode > > > Key: SPARK-23602 > URL: https://issues.apache.org/jira/browse/SPARK-23602 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Herman van Hovell >Assignee: Apache Spark >Priority: Trivial > > The {{PrintToStderr}} behaves differently for the interpreted and code > generated code paths. We should fix this. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23602) PrintToStderr should behave the same in interpreted mode
[ https://issues.apache.org/jira/browse/SPARK-23602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391354#comment-16391354 ] Apache Spark commented on SPARK-23602: -- User 'mgaido91' has created a pull request for this issue: https://github.com/apache/spark/pull/20773 > PrintToStderr should behave the same in interpreted mode > > > Key: SPARK-23602 > URL: https://issues.apache.org/jira/browse/SPARK-23602 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Herman van Hovell >Priority: Trivial > > The {{PrintToStderr}} behaves differently for the interpreted and code > generated code paths. We should fix this. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23549) Spark SQL unexpected behavior when comparing timestamp to date
[ https://issues.apache.org/jira/browse/SPARK-23549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391351#comment-16391351 ] Dong Jiang commented on SPARK-23549: [~kiszk], I expect your query to return false, as presto/Athena does. A date in SQL is typically thought of equivalent to timestamp at 00:00:00 > Spark SQL unexpected behavior when comparing timestamp to date > -- > > Key: SPARK-23549 > URL: https://issues.apache.org/jira/browse/SPARK-23549 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1 >Reporter: Dong Jiang >Priority: Major > > {code:java} > scala> spark.version > res1: String = 2.2.1 > scala> spark.sql("select cast('2017-03-01 00:00:00' as timestamp) between > cast('2017-02-28' as date) and cast('2017-03-01' as date)").show > +---+ > |((CAST(CAST(2017-03-01 00:00:00 AS TIMESTAMP) AS STRING) >= > CAST(CAST(2017-02-28 AS DATE) AS STRING)) AND (CAST(CAST(2017-03-01 00:00:00 > AS TIMESTAMP) AS STRING) <= CAST(CAST(2017-03-01 AS DATE) AS STRING)))| > +---+ > | > > false| > +---+{code} > As shown above, when a timestamp is compared to date in SparkSQL, both > timestamp and date are downcast to string, and leading to unexpected result. > If run the same SQL in presto/Athena, I got the expected result > {code:java} > select cast('2017-03-01 00:00:00' as timestamp) between cast('2017-02-28' as > date) and cast('2017-03-01' as date) > _col0 > 1 true > {code} > Is this a bug for Spark or a feature? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23549) Spark SQL unexpected behavior when comparing timestamp to date
[ https://issues.apache.org/jira/browse/SPARK-23549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391344#comment-16391344 ] Kazuaki Ishizaki commented on SPARK-23549: -- I think that this is a problem in Spark. My question is what should be the result for the following query? select cast('2017-03-01 23:59:59' as timestamp) between cast('2017-02-28' as date) and cast('2017-03-01' as date) > Spark SQL unexpected behavior when comparing timestamp to date > -- > > Key: SPARK-23549 > URL: https://issues.apache.org/jira/browse/SPARK-23549 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1 >Reporter: Dong Jiang >Priority: Major > > {code:java} > scala> spark.version > res1: String = 2.2.1 > scala> spark.sql("select cast('2017-03-01 00:00:00' as timestamp) between > cast('2017-02-28' as date) and cast('2017-03-01' as date)").show > +---+ > |((CAST(CAST(2017-03-01 00:00:00 AS TIMESTAMP) AS STRING) >= > CAST(CAST(2017-02-28 AS DATE) AS STRING)) AND (CAST(CAST(2017-03-01 00:00:00 > AS TIMESTAMP) AS STRING) <= CAST(CAST(2017-03-01 AS DATE) AS STRING)))| > +---+ > | > > false| > +---+{code} > As shown above, when a timestamp is compared to date in SparkSQL, both > timestamp and date are downcast to string, and leading to unexpected result. > If run the same SQL in presto/Athena, I got the expected result > {code:java} > select cast('2017-03-01 00:00:00' as timestamp) between cast('2017-02-28' as > date) and cast('2017-03-01' as date) > _col0 > 1 true > {code} > Is this a bug for Spark or a feature? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18165) Kinesis support in Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-18165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391305#comment-16391305 ] Vikram Agrawal commented on SPARK-18165: I have worked on an implementation of Kinesis Integration as a source for Structured Streaming. It's available here: https://github.com/qubole/kinesis-sql. Please try it out. Would be happy to discuss the design details and work on any concerns. If the implementation is acceptable and there is enough interest, I will start a PR for it. > Kinesis support in Structured Streaming > --- > > Key: SPARK-18165 > URL: https://issues.apache.org/jira/browse/SPARK-18165 > Project: Spark > Issue Type: New Feature > Components: DStreams >Reporter: Lauren Moos >Priority: Major > > Implement Kinesis based sources and sinks for Structured Streaming -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23513) java.io.IOException: Expected 12 fields, but got 5 for row :Spark submit error
[ https://issues.apache.org/jira/browse/SPARK-23513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391293#comment-16391293 ] zhoukang commented on SPARK-23513: -- Could you please post some more details?[~Fray] > java.io.IOException: Expected 12 fields, but got 5 for row :Spark submit > error > --- > > Key: SPARK-23513 > URL: https://issues.apache.org/jira/browse/SPARK-23513 > Project: Spark > Issue Type: Bug > Components: EC2, Examples, Input/Output, Java API >Affects Versions: 1.4.0, 2.2.0 >Reporter: Rawia >Priority: Blocker > > Hello > I'm trying to run a spark application (distributedWekaSpark) but when I'm > using the spark-submit command I get this error > {quote}{quote}ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) > java.io.IOException: Expected 12 fields, but got 5 for row: > outlook,temperature,humidity,windy,play > {quote}{quote} > I tried with other datasets but always the same error appeared, (always 12 > fields expected) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23628) WholeStageCodegen can generate methods with too many params
[ https://issues.apache.org/jira/browse/SPARK-23628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23628: Assignee: Apache Spark > WholeStageCodegen can generate methods with too many params > --- > > Key: SPARK-23628 > URL: https://issues.apache.org/jira/browse/SPARK-23628 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Marco Gaido >Assignee: Apache Spark >Priority: Major > > In SPARK-21717, we introduced the possibility to decouple the code for > consuming rows, in order to improve performance. Unfortunately, the method > used to compute the length of the java parameters was wrong, so we could have > errors related to exceeding the maximum number of java params in a method. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23628) WholeStageCodegen can generate methods with too many params
[ https://issues.apache.org/jira/browse/SPARK-23628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391286#comment-16391286 ] Apache Spark commented on SPARK-23628: -- User 'mgaido91' has created a pull request for this issue: https://github.com/apache/spark/pull/20772 > WholeStageCodegen can generate methods with too many params > --- > > Key: SPARK-23628 > URL: https://issues.apache.org/jira/browse/SPARK-23628 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Marco Gaido >Priority: Major > > In SPARK-21717, we introduced the possibility to decouple the code for > consuming rows, in order to improve performance. Unfortunately, the method > used to compute the length of the java parameters was wrong, so we could have > errors related to exceeding the maximum number of java params in a method. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23628) WholeStageCodegen can generate methods with too many params
[ https://issues.apache.org/jira/browse/SPARK-23628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23628: Assignee: (was: Apache Spark) > WholeStageCodegen can generate methods with too many params > --- > > Key: SPARK-23628 > URL: https://issues.apache.org/jira/browse/SPARK-23628 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Marco Gaido >Priority: Major > > In SPARK-21717, we introduced the possibility to decouple the code for > consuming rows, in order to improve performance. Unfortunately, the method > used to compute the length of the java parameters was wrong, so we could have > errors related to exceeding the maximum number of java params in a method. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23549) Spark SQL unexpected behavior when comparing timestamp to date
[ https://issues.apache.org/jira/browse/SPARK-23549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391282#comment-16391282 ] zhoukang edited comment on SPARK-23549 at 3/8/18 2:07 PM: -- I think this is a bug.Which may caused by rule below:(I have not run any test,just guessing) {code:java} case p @ BinaryComparison(left, right) if findCommonTypeForBinaryComparison(left.dataType, right.dataType).isDefined => val commonType = findCommonTypeForBinaryComparison(left.dataType, right.dataType).get p.makeCopy(Array(castExpr(left, commonType), castExpr(right, commonType))) {code} findCommonTypeForBinaryComparison will return StringType: {code:java} case (DateType, TimestampType) => Some(StringType) {code} May be we can add a new rule for this case? was (Author: cane): I think this is a bug.Which may caused by rule below: {code:java} case p @ BinaryComparison(left, right) if findCommonTypeForBinaryComparison(left.dataType, right.dataType).isDefined => val commonType = findCommonTypeForBinaryComparison(left.dataType, right.dataType).get p.makeCopy(Array(castExpr(left, commonType), castExpr(right, commonType))) {code} findCommonTypeForBinaryComparison will return StringType: {code:java} case (DateType, TimestampType) => Some(StringType) {code} May be we can add a new rule for this case? > Spark SQL unexpected behavior when comparing timestamp to date > -- > > Key: SPARK-23549 > URL: https://issues.apache.org/jira/browse/SPARK-23549 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1 >Reporter: Dong Jiang >Priority: Major > > {code:java} > scala> spark.version > res1: String = 2.2.1 > scala> spark.sql("select cast('2017-03-01 00:00:00' as timestamp) between > cast('2017-02-28' as date) and cast('2017-03-01' as date)").show > +---+ > |((CAST(CAST(2017-03-01 00:00:00 AS TIMESTAMP) AS STRING) >= > CAST(CAST(2017-02-28 AS DATE) AS STRING)) AND (CAST(CAST(2017-03-01 00:00:00 > AS TIMESTAMP) AS STRING) <= CAST(CAST(2017-03-01 AS DATE) AS STRING)))| > +---+ > | > > false| > +---+{code} > As shown above, when a timestamp is compared to date in SparkSQL, both > timestamp and date are downcast to string, and leading to unexpected result. > If run the same SQL in presto/Athena, I got the expected result > {code:java} > select cast('2017-03-01 00:00:00' as timestamp) between cast('2017-02-28' as > date) and cast('2017-03-01' as date) > _col0 > 1 true > {code} > Is this a bug for Spark or a feature? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23549) Spark SQL unexpected behavior when comparing timestamp to date
[ https://issues.apache.org/jira/browse/SPARK-23549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391282#comment-16391282 ] zhoukang commented on SPARK-23549: -- I think this is a bug.Which may caused by rule below: {code:java} case p @ BinaryComparison(left, right) if findCommonTypeForBinaryComparison(left.dataType, right.dataType).isDefined => val commonType = findCommonTypeForBinaryComparison(left.dataType, right.dataType).get p.makeCopy(Array(castExpr(left, commonType), castExpr(right, commonType))) {code} findCommonTypeForBinaryComparison will return StringType: {code:java} case (DateType, TimestampType) => Some(StringType) {code} May be we can add a new rule for this case? > Spark SQL unexpected behavior when comparing timestamp to date > -- > > Key: SPARK-23549 > URL: https://issues.apache.org/jira/browse/SPARK-23549 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1 >Reporter: Dong Jiang >Priority: Major > > {code:java} > scala> spark.version > res1: String = 2.2.1 > scala> spark.sql("select cast('2017-03-01 00:00:00' as timestamp) between > cast('2017-02-28' as date) and cast('2017-03-01' as date)").show > +---+ > |((CAST(CAST(2017-03-01 00:00:00 AS TIMESTAMP) AS STRING) >= > CAST(CAST(2017-02-28 AS DATE) AS STRING)) AND (CAST(CAST(2017-03-01 00:00:00 > AS TIMESTAMP) AS STRING) <= CAST(CAST(2017-03-01 AS DATE) AS STRING)))| > +---+ > | > > false| > +---+{code} > As shown above, when a timestamp is compared to date in SparkSQL, both > timestamp and date are downcast to string, and leading to unexpected result. > If run the same SQL in presto/Athena, I got the expected result > {code:java} > select cast('2017-03-01 00:00:00' as timestamp) between cast('2017-02-28' as > date) and cast('2017-03-01' as date) > _col0 > 1 true > {code} > Is this a bug for Spark or a feature? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-22751) Improve ML RandomForest shuffle performance
[ https://issues.apache.org/jira/browse/SPARK-22751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-22751. --- Resolution: Fixed Fix Version/s: 2.4.0 Issue resolved by pull request 20472 [https://github.com/apache/spark/pull/20472] > Improve ML RandomForest shuffle performance > --- > > Key: SPARK-22751 > URL: https://issues.apache.org/jira/browse/SPARK-22751 > Project: Spark > Issue Type: Improvement > Components: ML >Affects Versions: 2.2.0 >Reporter: lucio35 >Assignee: lucio35 >Priority: Minor > Fix For: 2.4.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > When I try to use ML Randomforest to train a classifier with dataset > news20.binary, which has 19,996 training examples and 1,355,191 features, i > found that shuffle write size( 51 GB ) of findSplitsBySorting is very large > compared with the small data size( 133.52 MB ). I think it is useful to > replace groupByKey by reduceByKey to improve shuffle performance. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-22751) Improve ML RandomForest shuffle performance
[ https://issues.apache.org/jira/browse/SPARK-22751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen reassigned SPARK-22751: - Assignee: lucio35 > Improve ML RandomForest shuffle performance > --- > > Key: SPARK-22751 > URL: https://issues.apache.org/jira/browse/SPARK-22751 > Project: Spark > Issue Type: Improvement > Components: ML >Affects Versions: 2.2.0 >Reporter: lucio35 >Assignee: lucio35 >Priority: Minor > Fix For: 2.4.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > When I try to use ML Randomforest to train a classifier with dataset > news20.binary, which has 19,996 training examples and 1,355,191 features, i > found that shuffle write size( 51 GB ) of findSplitsBySorting is very large > compared with the small data size( 133.52 MB ). I think it is useful to > replace groupByKey by reduceByKey to improve shuffle performance. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23628) WholeStageCodegen can generate methods with too many params
Marco Gaido created SPARK-23628: --- Summary: WholeStageCodegen can generate methods with too many params Key: SPARK-23628 URL: https://issues.apache.org/jira/browse/SPARK-23628 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Reporter: Marco Gaido In SPARK-21717, we introduced the possibility to decouple the code for consuming rows, in order to improve performance. Unfortunately, the method used to compute the length of the java parameters was wrong, so we could have errors related to exceeding the maximum number of java params in a method. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-23592) Add interpreted execution for DecodeUsingSerializer expression
[ https://issues.apache.org/jira/browse/SPARK-23592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Herman van Hovell resolved SPARK-23592. --- Resolution: Fixed Fix Version/s: 2.4.0 > Add interpreted execution for DecodeUsingSerializer expression > -- > > Key: SPARK-23592 > URL: https://issues.apache.org/jira/browse/SPARK-23592 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Herman van Hovell >Assignee: Marco Gaido >Priority: Major > Fix For: 2.4.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org