[ANNOUNCE] Announcing Apache Spark 2.3.4
We are happy to announce the availability of Spark 2.3.4! Spark 2.3.4 is a maintenance release containing stability fixes. This release is based on the branch-2.3 maintenance branch of Spark. We strongly recommend all 2.3.x users to upgrade to this stable release. To download Spark 2.3.4, head over to the download page: http://spark.apache.org/downloads.html To view the release notes: https://spark.apache.org/releases/spark-release-2-3-4.html We would like to acknowledge all community members for contributing to this release. This release would not have been possible without you. Kazuaki Ishizaki
RE: Release Spark 2.3.4
The following PRs regarding SPARK-28699 have been merged into branch-2.3. https://github.com/apache/spark/pull/25491 https://github.com/apache/spark/pull/25498 -> https://github.com/apache/spark/pull/25508 (backport to 2.3) I will cut `2.3.4-rc1` tag during weekend and starts 2.3.1 RC1 on next Monday. Regards, Kazuaki Ishizaki From: "Kazuaki Ishizaki" To: "Kazuaki Ishizaki" Cc: Dilip Biswal , dev , Hyukjin Kwon , jzh...@apache.org, Takeshi Yamamuro , Xiao Li Date: 2019/08/20 13:12 Subject:[EXTERNAL] RE: Release Spark 2.3.4 Due to the recent correctness issue at SPARK-28699, I will delay the release for Spark 2.3.4 RC1 for a while. https://issues.apache.org/jira/browse/SPARK-28699?focusedCommentId=16910859=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16910859 Regards, Kazuaki Ishizaki From: "Kazuaki Ishizaki" To:Hyukjin Kwon Cc:Dilip Biswal , dev , jzh...@apache.org, Takeshi Yamamuro , Xiao Li Date:2019/08/19 11:17 Subject:[EXTERNAL] RE: Release Spark 2.3.4 Hi all, Thank you. I will prepare RC for 2.3.4 this week in parallel. It will be in parallel with RC for 2.4.4 managed by Dongjoon. Regards, Kazuaki Ishizaki From:Hyukjin Kwon To:Dilip Biswal Cc: jzh...@apache.org, dev , Kazuaki Ishizaki , Takeshi Yamamuro , Xiao Li Date:2019/08/17 16:37 Subject:[EXTERNAL] Re: Release Spark 2.3.4 +1 too 2019년 8월 17일 (토) 오후 3:06, Dilip Biswal 님이 작성 : +1 Regards, Dilip Biswal Tel: 408-463-4980 dbis...@us.ibm.com - Original message - From: John Zhuge To: Xiao Li Cc: Takeshi Yamamuro , Spark dev list < d...@spark.apache.org>, Kazuaki Ishizaki Subject: [EXTERNAL] Re: Release Spark 2.3.4 Date: Fri, Aug 16, 2019 4:33 PM +1 On Fri, Aug 16, 2019 at 4:25 PM Xiao Li wrote: +1 On Fri, Aug 16, 2019 at 4:11 PM Takeshi Yamamuro wrote: +1, too Bests, Takeshi On Sat, Aug 17, 2019 at 7:25 AM Dongjoon Hyun wrote: +1 for 2.3.4 release as the last release for `branch-2.3` EOL. Also, +1 for next week release. Bests, Dongjoon. On Fri, Aug 16, 2019 at 8:19 AM Sean Owen wrote: I think it's fine to do these in parallel, yes. Go ahead if you are willing. On Fri, Aug 16, 2019 at 9:48 AM Kazuaki Ishizaki wrote: > > Hi, All. > > Spark 2.3.3 was released six months ago (15th February, 2019) at http://spark.apache.org/news/spark-2-3-3-released.html. And, about 18 months have been passed after Spark 2.3.0 has been released (28th February, 2018). > As of today (16th August), there are 103 commits (69 JIRAs) in `branch-23` since 2.3.3. > > It would be great if we can have Spark 2.3.4. > If it is ok, shall we start `2.3.4 RC1` concurrent with 2.4.4 or after 2.4.4 will be released? > > A issue list in jira: https://issues.apache.org/jira/projects/SPARK/versions/12344844 > A commit list in github from the last release: https://github.com/apache/spark/compare/66fd9c34bf406a4b5f86605d06c9607752bd637a...branch-2.3 > The 8 correctness issues resolved in branch-2.3: > https://issues.apache.org/jira/browse/SPARK-26873?jql=project%20%3D%2012315420%20AND%20fixVersion%20%3D%2012344844%20AND%20labels%20in%20(%27correctness%27)%20ORDER%20BY%20priority%20DESC%2C%20key%20ASC > > Best Regards, > Kazuaki Ishizaki - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org -- --- Takeshi Yamamuro -- -- John Zhuge - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
Release Spark 2.3.4
Hi, All. Spark 2.3.3 was released six months ago (15th February, 2019) at http://spark.apache.org/news/spark-2-3-3-released.html. And, about 18 months have been passed after Spark 2.3.0 has been released (28th February, 2018). As of today (16th August), there are 103 commits (69 JIRAs) in `branch-23` since 2.3.3. It would be great if we can have Spark 2.3.4. If it is ok, shall we start `2.3.4 RC1` concurrent with 2.4.4 or after 2.4.4 will be released? A issue list in jira: https://issues.apache.org/jira/projects/SPARK/versions/12344844 A commit list in github from the last release: https://github.com/apache/spark/compare/66fd9c34bf406a4b5f86605d06c9607752bd637a...branch-2.3 The 8 correctness issues resolved in branch-2.3: https://issues.apache.org/jira/browse/SPARK-26873?jql=project%20%3D%2012315420%20AND%20fixVersion%20%3D%2012344844%20AND%20labels%20in%20(%27correctness%27)%20ORDER%20BY%20priority%20DESC%2C%20key%20ASC Best Regards, Kazuaki Ishizaki
RE: Release Apache Spark 2.4.4
Thanks, Dongjoon! +1 Kazuaki Ishizaki, From: Hyukjin Kwon To: Takeshi Yamamuro Cc: Dongjoon Hyun , dev , User Date: 2019/08/14 09:21 Subject:[EXTERNAL] Re: Release Apache Spark 2.4.4 +1 2019년 8월 14일 (수) 오전 9:13, Takeshi Yamamuro 님 이 작성: Hi, Thanks for your notification, Dongjoon! I put some links for the other committers/PMCs to access the info easily: A commit list in github from the last release: https://github.com/apache/spark/compare/5ac2014e6c118fbeb1fe8e5c8064c4a8ee9d182a...branch-2.4 A issue list in jira: https://issues.apache.org/jira/projects/SPARK/versions/12345466#release-report-tab-body The 5 correctness issues resolved in branch-2.4: https://issues.apache.org/jira/browse/SPARK-27798?jql=project%20%3D%2012315420%20AND%20fixVersion%20%3D%2012345466%20AND%20labels%20in%20(%27correctness%27)%20ORDER%20BY%20priority%20DESC%2C%20key%20ASC Anyway, +1 Best, Takeshi On Wed, Aug 14, 2019 at 8:25 AM DB Tsai wrote: +1 On Tue, Aug 13, 2019 at 4:16 PM Dongjoon Hyun wrote: > > Hi, All. > > Spark 2.4.3 was released three months ago (8th May). > As of today (13th August), there are 112 commits (75 JIRAs) in `branch-24` since 2.4.3. > > It would be great if we can have Spark 2.4.4. > Shall we start `2.4.4 RC1` next Monday (19th August)? > > Last time, there was a request for K8s issue and now I'm waiting for SPARK-27900. > Please let me know if there is another issue. > > Thanks, > Dongjoon. - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org -- --- Takeshi Yamamuro
Re: Re: Release Apache Spark 2.4.4 before 3.0.0
Thank you Dongjoon for being a release manager. If the assumed dates are ok, I would like to volunteer for an 2.3.4 release manager. Best Regards, Kazuaki Ishizaki, From: Dongjoon Hyun To: dev , "user @spark" , Apache Spark PMC Date: 2019/07/13 07:18 Subject:[EXTERNAL] Re: Release Apache Spark 2.4.4 before 3.0.0 Thank you, Jacek. BTW, I added `@private` since we need PMC's help to make an Apache Spark release. Can I get more feedbacks from the other PMC members? Please me know if you have any concerns (e.g. Release date or Release manager?) As one of the community members, I assumed the followings (if we are on schedule). - 2.4.4 at the end of July - 2.3.4 at the end of August (since 2.3.0 was released at the end of February 2018) - 3.0.0 (possibily September?) - 3.1.0 (January 2020?) Bests, Dongjoon. On Thu, Jul 11, 2019 at 1:30 PM Jacek Laskowski wrote: Hi, Thanks Dongjoon Hyun for stepping up as a release manager! Much appreciated. If there's a volunteer to cut a release, I'm always to support it. In addition, the more frequent releases the better for end users so they have a choice to upgrade and have all the latest fixes or wait. It's their call not ours (when we'd keep them waiting). My big 2 yes'es for the release! Jacek On Tue, 9 Jul 2019, 18:15 Dongjoon Hyun, wrote: Hi, All. Spark 2.4.3 was released two months ago (8th May). As of today (9th July), there exist 45 fixes in `branch-2.4` including the following correctness or blocker issues. - SPARK-26038 Decimal toScalaBigInt/toJavaBigInteger not work for decimals not fitting in long - SPARK-26045 Error in the spark 2.4 release package with the spark-avro_2.11 dependency - SPARK-27798 from_avro can modify variables in other rows in local mode - SPARK-27907 HiveUDAF should return NULL in case of 0 rows - SPARK-28157 Make SHS clear KVStore LogInfo for the blacklist entries - SPARK-28308 CalendarInterval sub-second part should be padded before parsing It would be great if we can have Spark 2.4.4 before we are going to get busier for 3.0.0. If it's okay, I'd like to volunteer for an 2.4.4 release manager to roll it next Monday. (15th July). How do you think about this? Bests, Dongjoon.
Re: [Help] Codegen Stage grows beyond 64 KB
If it is difficult to create the small stand alone program, another approach seems to attach everything (i.e. configuration, data, program, console output, log, history server data, etc.) As a log, the community would recommend the info log with "spark.sql.codegen.logging.maxLines=2147483647". The log has to include the all of the generated Java methods. The community may take more time to address this problem than the case with the small program. Best Regards, Kazuaki Ishizaki From: Aakash Basu To: Kazuaki Ishizaki Cc: vaquar khan , Eyal Zituny , user Date: 2018/06/21 01:29 Subject:Re: [Help] Codegen Stage grows beyond 64 KB Hi Kazuaki, It would be really difficult to produce a small S-A code to reproduce this problem because, I'm running through a big pipeline of feature engineering where I derive a lot of variables based on the present ones to kind of explode the size of the table by many folds. Then, when I do any kind of join, this error shoots up. I tried with wholeStage.codegen=false, but that errors out the entire program rather than running it with a lesser optimized code. Any suggestion on how I can proceed towards a JIRA entry for this? Thanks, Aakash. On Wed, Jun 20, 2018 at 9:41 PM, Kazuaki Ishizaki wrote: Spark 2.3 tried to split a large generated Java methods into small methods as possible. However, this report may remain places that generates a large method. Would it be possible to create a JIRA entry with a small stand alone program that can reproduce this problem? It would be very helpful that the community will address this problem. Best regards, Kazuaki Ishizaki From:vaquar khan To:Eyal Zituny Cc:Aakash Basu , user < user@spark.apache.org> Date:2018/06/18 01:57 Subject:Re: [Help] Codegen Stage grows beyond 64 KB Totally agreed with Eyal . The problem is that when Java programs generated using Catalyst from programs using DataFrame and Dataset are compiled into Java bytecode, the size of byte code of one method must not be 64 KB or more, This conflicts with the limitation of the Java class file, which is an exception that occurs. In order to avoid occurrence of an exception due to this restriction, within Spark, a solution is to split the methods that compile and make Java bytecode that is likely to be over 64 KB into multiple methods when Catalyst generates Java programs It has been done. Use persist or any other logical separation in pipeline. Regards, Vaquar khan On Sun, Jun 17, 2018 at 5:25 AM, Eyal Zituny wrote: Hi Akash, such errors might appear in large spark pipelines, the root cause is a 64kb jvm limitation. the reason that your job isn't failing at the end is due to spark fallback - if code gen is failing, spark compiler will try to create the flow without the code gen (less optimized) if you do not want to see this error, you can either disable code gen using the flag: spark.sql.codegen.wholeStage= "false" or you can try to split your complex pipeline into several spark flows if possible hope that helps Eyal On Sun, Jun 17, 2018 at 8:16 AM, Aakash Basu wrote: Hi, I already went through it, that's one use case. I've a complex and very big pipeline of multiple jobs under one spark session. Not getting, on how to solve this, as it is happening over Logistic Regression and Random Forest models, which I'm just using from Spark ML package rather than doing anything by myself. Thanks, Aakash. On Sun 17 Jun, 2018, 8:21 AM vaquar khan, wrote: Hi Akash, Please check stackoverflow. https://stackoverflow.com/questions/41098953/codegen-grows-beyond-64-kb-error-when-normalizing-large-pyspark-dataframe Regards, Vaquar khan On Sat, Jun 16, 2018 at 3:27 PM, Aakash Basu wrote: Hi guys, I'm getting an error when I'm feature engineering on 30+ columns to create about 200+ columns. It is not failing the job, but the ERROR shows. I want to know how can I avoid this. Spark - 2.3.1 Python - 3.6 Cluster Config - 1 Master - 32 GB RAM, 16 Cores 4 Slaves - 16 GB RAM, 8 Cores Input data - 8 partitions of parquet file with snappy compression. My Spark-Submit -> spark-submit --master spark://192.168.60.20:7077 --num-executors 4 --executor-cores 5 --executor-memory 10G --driver-cores 5 --driver-memory 25G --conf spark.sql.shuffle.partitions=60 --conf spark.driver.maxResultSize=2G --conf "spark.executor.extraJavaOptions=-XX:+UseParallelGC" --conf spark.scheduler.listenerbus.eventqueue.capacity=2 --conf spark.sql.codegen=true /appdata/bblite-codebase/pipeline_data_test_run.py > /appdata/bblite-data/logs/log_10_iter_pipeline_8_partitions_33_col.txt Stack-Trace below - ERROR CodeGenerator:91 - failed to compile: org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass": Code of method "processNext()V" of class "org.apache.spark.sql.catalyst.expressions.Genera
Re: [Help] Codegen Stage grows beyond 64 KB
Spark 2.3 tried to split a large generated Java methods into small methods as possible. However, this report may remain places that generates a large method. Would it be possible to create a JIRA entry with a small stand alone program that can reproduce this problem? It would be very helpful that the community will address this problem. Best regards, Kazuaki Ishizaki From: vaquar khan To: Eyal Zituny Cc: Aakash Basu , user Date: 2018/06/18 01:57 Subject:Re: [Help] Codegen Stage grows beyond 64 KB Totally agreed with Eyal . The problem is that when Java programs generated using Catalyst from programs using DataFrame and Dataset are compiled into Java bytecode, the size of byte code of one method must not be 64 KB or more, This conflicts with the limitation of the Java class file, which is an exception that occurs. In order to avoid occurrence of an exception due to this restriction, within Spark, a solution is to split the methods that compile and make Java bytecode that is likely to be over 64 KB into multiple methods when Catalyst generates Java programs It has been done. Use persist or any other logical separation in pipeline. Regards, Vaquar khan On Sun, Jun 17, 2018 at 5:25 AM, Eyal Zituny wrote: Hi Akash, such errors might appear in large spark pipelines, the root cause is a 64kb jvm limitation. the reason that your job isn't failing at the end is due to spark fallback - if code gen is failing, spark compiler will try to create the flow without the code gen (less optimized) if you do not want to see this error, you can either disable code gen using the flag: spark.sql.codegen.wholeStage= "false" or you can try to split your complex pipeline into several spark flows if possible hope that helps Eyal On Sun, Jun 17, 2018 at 8:16 AM, Aakash Basu wrote: Hi, I already went through it, that's one use case. I've a complex and very big pipeline of multiple jobs under one spark session. Not getting, on how to solve this, as it is happening over Logistic Regression and Random Forest models, which I'm just using from Spark ML package rather than doing anything by myself. Thanks, Aakash. On Sun 17 Jun, 2018, 8:21 AM vaquar khan, wrote: Hi Akash, Please check stackoverflow. https://stackoverflow.com/questions/41098953/codegen-grows-beyond-64-kb-error-when-normalizing-large-pyspark-dataframe Regards, Vaquar khan On Sat, Jun 16, 2018 at 3:27 PM, Aakash Basu wrote: Hi guys, I'm getting an error when I'm feature engineering on 30+ columns to create about 200+ columns. It is not failing the job, but the ERROR shows. I want to know how can I avoid this. Spark - 2.3.1 Python - 3.6 Cluster Config - 1 Master - 32 GB RAM, 16 Cores 4 Slaves - 16 GB RAM, 8 Cores Input data - 8 partitions of parquet file with snappy compression. My Spark-Submit -> spark-submit --master spark://192.168.60.20:7077 --num-executors 4 --executor-cores 5 --executor-memory 10G --driver-cores 5 --driver-memory 25G --conf spark.sql.shuffle.partitions=60 --conf spark.driver.maxResultSize=2G --conf "spark.executor.extraJavaOptions=-XX:+UseParallelGC" --conf spark.scheduler.listenerbus.eventqueue.capacity=2 --conf spark.sql.codegen=true /appdata/bblite-codebase/pipeline_data_test_run.py > /appdata/bblite-data/logs/log_10_iter_pipeline_8_partitions_33_col.txt Stack-Trace below - ERROR CodeGenerator:91 - failed to compile: org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass": Code of method "processNext()V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3426" grows beyond 64 KB org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass": Code of method "processNext()V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3426" grows beyond 64 KB at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:361) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:234) at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:446) at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:313) at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:235) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:204) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1417) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1493) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1490) at org.spark_project.guava.cache.LocalCache$LoadingValueRefer
Re: Strange codegen error for SortMergeJoin in Spark 2.2.1
Thank you for reporting a problem. Would it be possible to create a JIRA entry with a small program that can reproduce this problem? Best Regards, Kazuaki Ishizaki From: Rico Bergmann To: "user@spark.apache.org" Date: 2018/06/05 19:58 Subject:Strange codegen error for SortMergeJoin in Spark 2.2.1 Hi! I get a strange error when executing a complex SQL-query involving 4 tables that are left-outer-joined: Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 37, Column 18: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 37, Column 18: No applicable constructor/method found for actual parameters "int"; candidates are: "org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(org.apache.spark.memory.TaskMemoryManager, org.apache.spark.storage.BlockManager, org.apache.spark.serializer.SerializerManager, org.apache.spark.TaskContext, int, long, int, int)", "org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(int, int)" ... /* 037 */ smj_matches = new org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(2147483647); The same query works with Spark 2.2.0. I checked the Spark source code and saw that in ExternalAppendOnlyUnsafeRowArray a second int was introduced into the constructor in 2.2.1 But looking at the codegeneration part of SortMergeJoinExec: // A list to hold all matched rows from right side. val matches = ctx.freshName("matches") val clsName = classOf[ExternalAppendOnlyUnsafeRowArray].getName val spillThreshold = getSpillThreshold val inMemoryThreshold = getInMemoryThreshold ctx.addMutableState(clsName, matches, s"$matches = new $clsName($inMemoryThreshold, $spillThreshold);") it should get 2 parameters, not just one. May be anyone has an idea? Best, Rico.
Re: tuning - Spark data serialization for cache() ?
For Dataframe (and Dataset) cache(), neither Java nor Kryo serialization is used. There is no way to use Java or Kryo serialization for DataFrame.cache() or Dataset.cache() for in-memory. Are you talking about serialization to Disk? In previous mail, I talked about only in-memory. Regards, Kazuaki Ishizaki From: Ofir Manor <ofir.ma...@equalum.io> To: Kazuaki Ishizaki <ishiz...@jp.ibm.com> Cc: user <user@spark.apache.org> Date: 2017/08/08 03:12 Subject:Re: tuning - Spark data serialization for cache() ? Thanks a lot for the quick pointer! So, is the advice I linked to in official Spark 2.2 documentation misleading? You are saying that Spark 2.2 does not use by Java serialization? And the tip to switch to Kyro is also outdated? Ofir Manor Co-Founder & CTO | Equalum Mobile: +972-54-7801286 | Email: ofir.ma...@equalum.io On Mon, Aug 7, 2017 at 8:47 PM, Kazuaki Ishizaki <ishiz...@jp.ibm.com> wrote: For Dataframe (and Dataset), cache() already uses fast serialization/deserialization with data compression schemes. We already identified some performance issues regarding cache(). We are working for alleviating these issues in https://issues.apache.org/jira/browse/SPARK-14098. We expect that these PRs will be integrated into Spark 2.3. Kazuaki Ishizaki From:Ofir Manor <ofir.ma...@equalum.io> To:user <user@spark.apache.org> Date:2017/08/08 02:04 Subject:tuning - Spark data serialization for cache() ? Hi, I'm using Spark 2.2, and have a big batch job, using dataframes (with built-in, basic types). It references the same intermediate dataframe multiple times, so I wanted to try to cache() that and see if it helps, both in memory footprint and performance. Now, the Spark 2.2 tuning page ( http://spark.apache.org/docs/latest/tuning.html) clearly says: 1. The default Spark serialization is Java serialization. 2. It is recommended to switch to Kyro serialization. 3. "Since Spark 2.0.0, we internally use Kryo serializer when shuffling RDDs with simple types, arrays of simple types, or string type". Now, I remember that in 2.0 launch, there were discussion of a third serialization format that is much more performant and compact. (Encoder?), but it is not referenced in the tuning guide and its Scala doc is not very clear to me. Specifically, Databricks shared some graphs etc of how much it is better than Kyro and Java serialization - see Encoders here: https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html So, is that relevant to cache()? If so, how can I enable it - and is it for MEMORY_AND_DISK_ONLY or MEMORY_AND_DISK_SER? I tried to play with some other variations, like enabling Kyro by the tuning guide instructions, but didn't see any impact on the cached dataframe size (same tens of GBs in the UI). So any tips around that? Thanks. Ofir Manor Co-Founder & CTO | Equalum Mobile: +972-54-7801286 | Email: ofir.ma...@equalum.io
Re: tuning - Spark data serialization for cache() ?
For Dataframe (and Dataset), cache() already uses fast serialization/deserialization with data compression schemes. We already identified some performance issues regarding cache(). We are working for alleviating these issues in https://issues.apache.org/jira/browse/SPARK-14098. We expect that these PRs will be integrated into Spark 2.3. Kazuaki Ishizaki From: Ofir Manor <ofir.ma...@equalum.io> To: user <user@spark.apache.org> Date: 2017/08/08 02:04 Subject:tuning - Spark data serialization for cache() ? Hi, I'm using Spark 2.2, and have a big batch job, using dataframes (with built-in, basic types). It references the same intermediate dataframe multiple times, so I wanted to try to cache() that and see if it helps, both in memory footprint and performance. Now, the Spark 2.2 tuning page ( http://spark.apache.org/docs/latest/tuning.html) clearly says: 1. The default Spark serialization is Java serialization. 2. It is recommended to switch to Kyro serialization. 3. "Since Spark 2.0.0, we internally use Kryo serializer when shuffling RDDs with simple types, arrays of simple types, or string type". Now, I remember that in 2.0 launch, there were discussion of a third serialization format that is much more performant and compact. (Encoder?), but it is not referenced in the tuning guide and its Scala doc is not very clear to me. Specifically, Databricks shared some graphs etc of how much it is better than Kyro and Java serialization - see Encoders here: https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html So, is that relevant to cache()? If so, how can I enable it - and is it for MEMORY_AND_DISK_ONLY or MEMORY_AND_DISK_SER? I tried to play with some other variations, like enabling Kyro by the tuning guide instructions, but didn't see any impact on the cached dataframe size (same tens of GBs in the UI). So any tips around that? Thanks. Ofir Manor Co-Founder & CTO | Equalum Mobile: +972-54-7801286 | Email: ofir.ma...@equalum.io
Re: Java access to internal representation of DataTypes.DateType
Does this code help you? https://github.com/apache/spark/blob/master/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java#L156-L194 Kazuaki Ishizaki From: Anton Kravchenko <kravchenko.anto...@gmail.com> To: "user @spark" <user@spark.apache.org> Date: 2017/06/14 01:16 Subject:Java access to internal representation of DataTypes.DateType How one would access to internal representation of DataTypes.DateType from Spark (2.0.1) Java API? From https://github.com/apache/spark/blob/51b1c1551d3a7147403b9e821fcc7c8f57b4824c/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala : "Internally, this is represented as the number of days from 1970-01-01."
Re: how do i force unit test to do whole stage codegen
Hi, The page in the URL explains the old style of physical plan output. The current style adds "*" as a prefix of each operation that the whole-stage codegen can be apply to. So, in your test case, whole-stage codegen has been already enabled!! FYI. I think that it is a good topic for d...@spark.apache.org. Kazuaki Ishizaki From: Koert Kuipers <ko...@tresata.com> To: "user@spark.apache.org" <user@spark.apache.org> Date: 2017/04/05 05:12 Subject:how do i force unit test to do whole stage codegen i wrote my own expression with eval and doGenCode, but doGenCode never gets called in tests. also as a test i ran this in a unit test: spark.range(10).select('id as 'asId).where('id === 4).explain according to https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-whole-stage-codegen.html this is supposed to show: == Physical Plan == WholeStageCodegen : +- Project [id#0L AS asId#3L] : +- Filter (id#0L = 4) :+- Range 0, 1, 8, 10, [id#0L] but it doesn't. instead it shows: == Physical Plan == *Project [id#12L AS asId#15L] +- *Filter (id#12L = 4) +- *Range (0, 10, step=1, splits=Some(4)) so i am again missing the WholeStageCodegen. any idea why? i create spark session for unit tests simply as: val session = SparkSession.builder .master("local[*]") .appName("test") .config("spark.sql.shuffle.partitions", 4) .getOrCreate()
Re: [Spark SQL & Core]: RDD to Dataset 1500 columns data with createDataFrame() throw exception of grows beyond 64 KB
Hi There is the latest status for code generation. When we use the master that will be Spark 2.2, the following exception occurs. The latest version fixed 64KB errors in this case. However, we meet another JVM limit, the number of the constant pool entry. Caused by: org.codehaus.janino.JaninoRuntimeException: Constant pool for class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection has grown past JVM limit of 0x at org.codehaus.janino.util.ClassFile.addToConstantPool(ClassFile.java:499) at org.codehaus.janino.util.ClassFile.addConstantNameAndTypeInfo(ClassFile.java:439) at org.codehaus.janino.util.ClassFile.addConstantMethodrefInfo(ClassFile.java:358) ... While this PR https://github.com/apache/spark/pull/16648 addresses the number of the constant pool issue, it has not been merged yet. Regards, Kazuaki Ishizaki From: elevy <elev...@gmail.com> To: user@spark.apache.org Date: 2017/03/18 17:14 Subject:[Spark SQL & Core]: RDD to Dataset 1500 columns data with createDataFrame() throw exception of grows beyond 64 KB Hello all, I am using the Spark 2.1.0 release, I am trying to load BigTable CSV file with more than 1500 columns into our system Our flow of doing that is: • First, read the data as an RDD • generate continuing record id using the zipWithIndex() (this operation exist only in RDD API, in the Dataset there is similar option which is monotonically_increasing_id() but this method work in partitioning and create id which is not sequentially – and it is not what we need ☹) • converting the RDD to Dataset using the createDataFrame() of sparkSession • this last operation generate code that exceeded the JVM object size limitation of 64KB I search the web for some solution or even similar Use Case, found few issues that talked about the 64KB error but all of the cases was dealing with 100 column and solved in Spark 2.1.0 version by shrinking the generated code, but none of them reach the JVM limitation *Any Idea from this forum of expert will be very appreciated * there could be 2 type of solution we are looking for: *1.* How should I overcome the size of the code generation *OR* *2.* How can I generate sequential ID directly on the Dataset Our Temporal Solution: • reading the DS as RDD • generate sequential id • write the new data as text file • reading the data as Dataset this solution cause us 30% of performance degradation :( *Code That reproduced the issue * /import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.*; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import poc.commons.SparkSessionInitializer; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.stream.IntStream; public class RDDConverter { private static final int FIELD_COUNT = 1900; private Dataset createBigSchema(SparkSession sparkSession , int startColName, int fieldNumber) { JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext()); SQLContext sqlContext = new SQLContext(sparkSession.sparkContext()); String[] row = IntStream.range(startColName, fieldNumber).mapToObj(String::valueOf).toArray(String[]::new); List<String[]> data = Collections.singletonList(row); JavaRDD rdd = jsc.parallelize(data).map(RowFactory::create); StructField[] structFields = IntStream.range(startColName, fieldNumber) .mapToObj(i -> new StructField(String.valueOf(i), DataTypes.StringType, true, Metadata.empty())) .toArray(StructField[]::new); StructType schema = DataTypes.createStructType(structFields); Dataset dataSet = sqlContext.createDataFrame(rdd, schema); dataSet.show(); return dataSet; } public static void main(String[] args) { SparkSessionInitializer sparkSessionInitializer = new SparkSessionInitializer(); SparkSession sparkSession = sparkSessionInitializer.init(); RDDConverter rddConverter = new RDDConverter(); rddConverter.createBigSchema(sparkSession, 0, FIELD_COUNT); } }/ The Exception we are getting : org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:893) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:950) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:947) at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) ... 39 co
Re: Linear regression + Janino Exception
Thank you for reporting the error. I think that this is associated to https://issues.apache.org/jira/browse/SPARK-18492 The reporter of this JIRA entry has not posted the program yet. Would it be possible to add your program that can reproduce this issue to this JIRA entry? Regards, Kazuaki Ishizaki From: janardhan shetty <janardhan...@gmail.com> To: user <user@spark.apache.org> Date: 2016/11/21 12:01 Subject:Re: Linear regression + Janino Exception Seems like this is associated to : https://issues.apache.org/jira/browse/SPARK-16845 On Sun, Nov 20, 2016 at 6:09 PM, janardhan shetty <janardhan...@gmail.com> wrote: Hi, I am trying to execute Linear regression algorithm for Spark 2.02 and hitting the below error when I am fitting my training set: val lrModel = lr.fit(train) It happened on 2.0.0 as well. Any resolution steps is appreciated. Error Snippet: 16/11/20 18:03:45 ERROR CodeGenerator: failed to compile: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Ljava/lang/Object;)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB /* 001 */ public java.lang.Object generate(Object[] references) { /* 002 */ return new SpecificUnsafeProjection(references); /* 003 */ } /* 004 */ /* 005 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection { /* 006 */ /* 007 */ private Object[] references; /* 008 */ private org.apache.spark.sql.catalyst.expressions.ScalaUDF scalaUDF; /* 009 */ private scala.Function1 catalystConverter; /* 010 */ private scala.Function1 converter; /* 011 */ private scala.Function1 udf; /* 012 */ private org.apache.spark.sql.catalyst.expressions.ScalaUDF scalaUDF1; /* 013 */ private scala.Function1 catalystConverter1; /* 014 */ private scala.Function1 converter1; /* 015 */ private scala.Function1 udf1; /* 016 */ private org.apache.spark.sql.catalyst.expressions.ScalaUDF scalaUDF2; /* 017 */ private scala.Function1 catalystConverter2;
Re: Spark SQL is slower when DataFrame is cache in Memory
Hi Chin Wei, Thank you for confirming this on 2.0.1 and being happy to hear it never happens. The performance will be improved when this PR ( https://github.com/apache/spark/pull/15219) is integrated. Regards, Kazuaki Ishizaki From: Chin Wei Low <lowchin...@gmail.com> To: Kazuaki Ishizaki/Japan/IBM@IBMJP Cc: user <user@spark.apache.org> Date: 2016/10/25 17:33 Subject:Re: Spark SQL is slower when DataFrame is cache in Memory Hi Kazuaki, I print a debug log right before I call the collect, and use that to compare against the job start log (it is available when turning on debug log). Anyway, I test that in Spark 2.0.1 and never see it happen. But, the query on cached dataframe is still slightly slower than the one without cached when it is running on Spark 2.0.1. Regards, Low Chin Wei On Tue, Oct 25, 2016 at 3:39 AM, Kazuaki Ishizaki <ishiz...@jp.ibm.com> wrote: Hi Chin Wei, I am sorry for being late to reply. Got it. Interesting behavior. How did you measure the time between 1st and 2nd events? Best Regards, Kazuaki Ishizaki From:Chin Wei Low <lowchin...@gmail.com> To:Kazuaki Ishizaki/Japan/IBM@IBMJP Cc:user@spark.apache.org Date:2016/10/10 11:33 Subject:Re: Spark SQL is slower when DataFrame is cache in Memory Hi Ishizaki san, Thanks for the reply. So, when I pre-cache the dataframe, the cache is being used during the job execution. Actually there are 3 events: 1. call res.collect 2. job started 3. job completed I am concerning about the longer time taken between 1st and 2nd events. It seems like the query planning and optimization is longer when query on cached dataframe. Regards, Chin Wei On Fri, Oct 7, 2016 at 10:14 PM, Kazuaki Ishizaki <ishiz...@jp.ibm.com> wrote: Hi Chin Wei, Yes, since you force to create a cache by executing df.count, Spark starts to get data from cache for the following task: val res = sqlContext.sql("table1 union table2 union table3") res.collect() If you insert 'res.explain', you can confirm which resource you use to get data, cache or parquet? val res = sqlContext.sql("table1 union table2 union table3") res.explain(true) res.collect() Do I make some misunderstandings? Best Regards, Kazuaki Ishizaki From: Chin Wei Low <lowchin...@gmail.com> To:Kazuaki Ishizaki/Japan/IBM@IBMJP Cc:user@spark.apache.org Date:2016/10/07 20:06 Subject:Re: Spark SQL is slower when DataFrame is cache in Memory Hi Ishizaki san, So there is a gap between res.collect and when I see this log: spark.SparkContext: Starting job: collect at :26 What you mean is, during this time Spark already start to get data from cache? Isn't it should only get the data after the job is started and tasks are distributed? Regards, Chin Wei On Fri, Oct 7, 2016 at 3:43 PM, Kazuaki Ishizaki <ishiz...@jp.ibm.com> wrote: Hi, I think that the result looks correct. The current Spark spends extra time for getting data from a cache. There are two reasons. One is for a complicated path to get a data. The other is for decompression in the case of a primitive type. The new implementation (https://github.com/apache/spark/pull/15219) is ready for review. It would achieve 1.2x performance improvement for a compressed column and much performance improvement for an uncompressed column. Best Regards, Kazuaki Ishizaki From:Chin Wei Low <lowchin...@gmail.com> To:user@spark.apache.org Date:2016/10/07 13:05 Subject:Spark SQL is slower when DataFrame is cache in Memory Hi, I am using Spark 1.6.0. I have a Spark application that create and cache (in memory) DataFrames (around 50+, with some on single parquet file and some on folder with a few parquet files) with the following codes: val df = sqlContext.read.parquet df.persist df.count I union them to 3 DataFrames and register that as temp table. Then, run the following codes: val res = sqlContext.sql("table1 union table2 union table3") res.collect() The res.collect() is slower when I cache the DataFrame compare to without cache. e.g. 3 seconds vs 1 second I turn on the DEBUG log and see there is a gap from the res.collect() to start the Spark job. Is the extra time taken by the query planning & optimization? It does not show the gap when I do not cache the dataframe. Anything I am missing here? Regards, Chin Wei
Re: [Spark 2.0.1] Error in generated code, possible regression?
Can you have a smaller program that can reproduce the same error? If you also create a JIRA entry, it would be great. Kazuaki Ishizaki From: Efe Selcuk <efema...@gmail.com> To: "user @spark" <user@spark.apache.org> Date: 2016/10/25 10:23 Subject:[Spark 2.0.1] Error in generated code, possible regression? I have an application that works in 2.0.0 but has been dying at runtime on the 2.0.1 distribution. at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:893) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:950) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:947) at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) ... 30 more Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 74, Column 145: Unknown variable or type "value4" It also includes a massive 1800-line generated code output (which repeats over and over, even on 1 thread, which makes this a pain), but fortunately the error occurs early so I can give at least some context. /* 001 */ public java.lang.Object generate(Object[] references) { /* 002 */ return new SpecificMutableProjection(references); /* 003 */ } /* 004 */ /* 005 */ class SpecificMutableProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection { /* 006 */ /* 007 */ private Object[] references; /* 008 */ private MutableRow mutableRow; /* 009 */ private Object[] values; ... // many lines of class variables, mostly errMsg strings and Object[] /* 071 */ private void apply2_7(InternalRow i) { /* 072 */ /* 073 */ boolean isNull215 = false; /* 074 */ final com.mypackage.MyThing value215 = isNull215 ? null : (com.mypackage.MyThing) value4._2(); /* 075 */ isNull215 = value215 == null; /* 076 */ ... As you can see, on line 74 there's a reference to value4 but nothing called value4 has been defined. I have no idea of where to even begin looking for what caused this, or even whether it's my fault or a bug in the code generation. Any help is appreciated. Efe
Re: Spark SQL is slower when DataFrame is cache in Memory
Hi Chin Wei, I am sorry for being late to reply. Got it. Interesting behavior. How did you measure the time between 1st and 2nd events? Best Regards, Kazuaki Ishizaki From: Chin Wei Low <lowchin...@gmail.com> To: Kazuaki Ishizaki/Japan/IBM@IBMJP Cc: user@spark.apache.org Date: 2016/10/10 11:33 Subject:Re: Spark SQL is slower when DataFrame is cache in Memory Hi Ishizaki san, Thanks for the reply. So, when I pre-cache the dataframe, the cache is being used during the job execution. Actually there are 3 events: 1. call res.collect 2. job started 3. job completed I am concerning about the longer time taken between 1st and 2nd events. It seems like the query planning and optimization is longer when query on cached dataframe. Regards, Chin Wei On Fri, Oct 7, 2016 at 10:14 PM, Kazuaki Ishizaki <ishiz...@jp.ibm.com> wrote: Hi Chin Wei, Yes, since you force to create a cache by executing df.count, Spark starts to get data from cache for the following task: val res = sqlContext.sql("table1 union table2 union table3") res.collect() If you insert 'res.explain', you can confirm which resource you use to get data, cache or parquet? val res = sqlContext.sql("table1 union table2 union table3") res.explain(true) res.collect() Do I make some misunderstandings? Best Regards, Kazuaki Ishizaki From:Chin Wei Low <lowchin...@gmail.com> To:Kazuaki Ishizaki/Japan/IBM@IBMJP Cc:user@spark.apache.org Date:2016/10/07 20:06 Subject:Re: Spark SQL is slower when DataFrame is cache in Memory Hi Ishizaki san, So there is a gap between res.collect and when I see this log: spark.SparkContext: Starting job: collect at :26 What you mean is, during this time Spark already start to get data from cache? Isn't it should only get the data after the job is started and tasks are distributed? Regards, Chin Wei On Fri, Oct 7, 2016 at 3:43 PM, Kazuaki Ishizaki <ishiz...@jp.ibm.com> wrote: Hi, I think that the result looks correct. The current Spark spends extra time for getting data from a cache. There are two reasons. One is for a complicated path to get a data. The other is for decompression in the case of a primitive type. The new implementation (https://github.com/apache/spark/pull/15219) is ready for review. It would achieve 1.2x performance improvement for a compressed column and much performance improvement for an uncompressed column. Best Regards, Kazuaki Ishizaki From:Chin Wei Low <lowchin...@gmail.com> To:user@spark.apache.org Date:2016/10/07 13:05 Subject:Spark SQL is slower when DataFrame is cache in Memory Hi, I am using Spark 1.6.0. I have a Spark application that create and cache (in memory) DataFrames (around 50+, with some on single parquet file and some on folder with a few parquet files) with the following codes: val df = sqlContext.read.parquet df.persist df.count I union them to 3 DataFrames and register that as temp table. Then, run the following codes: val res = sqlContext.sql("table1 union table2 union table3") res.collect() The res.collect() is slower when I cache the DataFrame compare to without cache. e.g. 3 seconds vs 1 second I turn on the DEBUG log and see there is a gap from the res.collect() to start the Spark job. Is the extra time taken by the query planning & optimization? It does not show the gap when I do not cache the dataframe. Anything I am missing here? Regards, Chin Wei
Re: Change nullable property in Dataset schema
Thank you for your comments > You should just Seq(...).toDS I tried this, however the result is not changed. >> val ds2 = ds1.map(e => e) > Why are you e => e (since it's identity) and does nothing? Yes, e => e does nothing. For the sake of simplicity of an example, I used the simplest expression in map(). In current Spark, an expression in map() does not change an schema for its output. > .as(RowEncoder(new StructType() > .add("value", ArrayType(IntegerType, false), nullable = false))) Sorry, this was my mistake. It did not work for my purpose. It actually does nothing. Kazuaki Ishizaki From: Jacek Laskowski <ja...@japila.pl> To: Kazuaki Ishizaki/Japan/IBM@IBMJP Cc: user <user@spark.apache.org> Date: 2016/08/15 04:56 Subject:Re: Change nullable property in Dataset schema On Wed, Aug 10, 2016 at 12:04 AM, Kazuaki Ishizaki <ishiz...@jp.ibm.com> wrote: > import testImplicits._ > test("test") { > val ds1 = sparkContext.parallelize(Seq(Array(1, 1), Array(2, 2), > Array(3, 3)), 1).toDS You should just Seq(...).toDS > val ds2 = ds1.map(e => e) Why are you e => e (since it's identity) and does nothing? > .as(RowEncoder(new StructType() > .add("value", ArrayType(IntegerType, false), nullable = false))) I didn't know it's possible but looks like it's toDF where you could replace the schema too (in a less involved way). I learnt quite a lot from just a single email. Thanks! Pozdrawiam, Jacek - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Change nullable property in Dataset schema
My motivation is to simplify Java code generated by a compiler of Tungsten. Here is a dump of generated code from the program. https://gist.github.com/kiszk/402bd8bc45a14be29acb3674ebc4df24 If we can succeeded to let catalyst the result of map is never null, we can eliminate conditional branches. For example, in the above URL, we can say the condition at line 45 is always false since the result of map() is never null by using our schema. As a result, we can eliminate assignments at lines 52 and 56, and conditional branches at lines 55 and 61. Kazuaki Ishizaki From: Koert Kuipers <ko...@tresata.com> To: Kazuaki Ishizaki/Japan/IBM@IBMJP Cc: "user@spark.apache.org" <user@spark.apache.org> Date: 2016/08/16 04:35 Subject:Re: Change nullable property in Dataset schema why do you want the array to have nullable = false? what is the benefit? On Wed, Aug 3, 2016 at 10:45 AM, Kazuaki Ishizaki <ishiz...@jp.ibm.com> wrote: Dear all, Would it be possible to let me know how to change nullable property in Dataset? When I looked for how to change nullable property in Dataframe schema, I found the following approaches. http://stackoverflow.com/questions/33193958/change-nullable-property-of-column-in-spark-dataframe https://github.com/apache/spark/pull/13873(Not merged yet) However, I cannot find how to change nullable property in Dataset schema. Even when I wrote the following program, nullable property for "value: array" in ds2.schema is not changed. If my understanding is correct, current Spark 2.0 uses an ExpressionEncoder that is generated based on Dataset[T] at https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala#L46 class Test extends QueryTest with SharedSQLContext { import testImplicits._ test("test") { val ds1 = sparkContext.parallelize(Seq(Array(1, 1), Array(2, 2), Array(3, 3)), 1).toDS val schema = new StructType().add("array", ArrayType(IntegerType, false), false) val inputObject = BoundReference(0, ScalaReflection.dataTypeFor[Array[Int]], false) val encoder = new ExpressionEncoder[Array[Int]](schema, true, ScalaReflection.serializerFor[Array[Int]](inputObject).flatten, ScalaReflection.deserializerFor[Array[Int]], ClassTag[Array[Int]](classOf[Array[Int]])) val ds2 = ds1.map(e => e)(encoder) ds1.printSchema ds2.printSchema } } root |-- value: array (nullable = true) ||-- element: integer (containsNull = false) root |-- value: array (nullable = true) // Expected (nullable = false) ||-- element: integer (containsNull = false) Kazuaki Ishizaki
Re: Spark 2.0.0 JaninoRuntimeException
I just realized it since it broken a build with Scala 2.10. https://github.com/apache/spark/commit/fa244e5a90690d6a31be50f2aa203ae1a2e9a1cf I can reproduce the problem in SPARK-15285 with master branch. Should we reopen SPARK-15285? Best Regards, Kazuaki Ishizaki, From: Ted Yu <yuzhih...@gmail.com> To: dhruve ashar <dhruveas...@gmail.com> Cc: Aris <arisofala...@gmail.com>, "user@spark.apache.org" <user@spark.apache.org> Date: 2016/08/15 06:19 Subject:Re: Spark 2.0.0 JaninoRuntimeException Looks like the proposed fix was reverted: Revert "[SPARK-15285][SQL] Generated SpecificSafeProjection.apply method grows beyond 64 KB" This reverts commit fa244e5a90690d6a31be50f2aa203ae1a2e9a1cf. Maybe this was fixed in some other JIRA ? On Fri, Aug 12, 2016 at 2:30 PM, dhruve ashar <dhruveas...@gmail.com> wrote: I see a similar issue being resolved recently: https://issues.apache.org/jira/browse/SPARK-15285 On Fri, Aug 12, 2016 at 3:33 PM, Aris <arisofala...@gmail.com> wrote: Hello folks, I'm on Spark 2.0.0 working with Datasets -- and despite the fact that smaller data unit tests work on my laptop, when I'm on a cluster, I get cryptic error messages: Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;)I" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificOrdering" grows beyond 64 KB Unfortunately I'm not clear on how to even isolate the source of this problem. I didn't have this problem in Spark 1.6.1. Any clues? -- -Dhruve Ashar
Re: Change nullable property in Dataset schema
After some investigations, I was able to change nullable property in Dataset[Array[Int]] in the following way. Is this right way? (1) Apply https://github.com/apache/spark/pull/13873 (2) Use two Encoders. One is RowEncoder. The other is predefined ExressionEncoder. class Test extends QueryTest with SharedSQLContext { import testImplicits._ test("test") { val ds1 = sparkContext.parallelize(Seq(Array(1, 1), Array(2, 2), Array(3, 3)), 1).toDS val ds2 = ds1.map(e => e) .as(RowEncoder(new StructType() .add("value", ArrayType(IntegerType, false), nullable = false))) .as(newDoubleArrayEncoder) ds1.printSchema ds2.printSchema } } root |-- value: array (nullable = true) ||-- element: integer (containsNull = false) root |-- value: array (nullable = false) ||-- element: integer (containsNull = false) Kazuaki Ishizaki From: Kazuaki Ishizaki/Japan/IBM@IBMJP To: user@spark.apache.org Date: 2016/08/03 23:46 Subject:Change nullable property in Dataset schema Dear all, Would it be possible to let me know how to change nullable property in Dataset? When I looked for how to change nullable property in Dataframe schema, I found the following approaches. http://stackoverflow.com/questions/33193958/change-nullable-property-of-column-in-spark-dataframe https://github.com/apache/spark/pull/13873(Not merged yet) However, I cannot find how to change nullable property in Dataset schema. Even when I wrote the following program, nullable property for "value: array" in ds2.schema is not changed. If my understanding is correct, current Spark 2.0 uses an ExpressionEncoder that is generated based on Dataset[T] at https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala#L46 class Test extends QueryTest with SharedSQLContext { import testImplicits._ test("test") { val ds1 = sparkContext.parallelize(Seq(Array(1, 1), Array(2, 2), Array(3, 3)), 1).toDS val schema = new StructType().add("array", ArrayType(IntegerType, false), false) val inputObject = BoundReference(0, ScalaReflection.dataTypeFor[Array[Int]], false) val encoder = new ExpressionEncoder[Array[Int]](schema, true, ScalaReflection.serializerFor[Array[Int]](inputObject).flatten, ScalaReflection.deserializerFor[Array[Int]], ClassTag[Array[Int]](classOf[Array[Int]])) val ds2 = ds1.map(e => e)(encoder) ds1.printSchema ds2.printSchema } } root |-- value: array (nullable = true) ||-- element: integer (containsNull = false) root |-- value: array (nullable = true) // Expected (nullable = false) | |-- element: integer (containsNull = false) Kazuaki Ishizaki
Change nullable property in Dataset schema
Dear all, Would it be possible to let me know how to change nullable property in Dataset? When I looked for how to change nullable property in Dataframe schema, I found the following approaches. http://stackoverflow.com/questions/33193958/change-nullable-property-of-column-in-spark-dataframe https://github.com/apache/spark/pull/13873 (Not merged yet) However, I cannot find how to change nullable property in Dataset schema. Even when I wrote the following program, nullable property for "value: array" in ds2.schema is not changed. If my understanding is correct, current Spark 2.0 uses an ExpressionEncoder that is generated based on Dataset[T] at https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala#L46 class Test extends QueryTest with SharedSQLContext { import testImplicits._ test("test") { val ds1 = sparkContext.parallelize(Seq(Array(1, 1), Array(2, 2), Array(3, 3)), 1).toDS val schema = new StructType().add("array", ArrayType(IntegerType, false), false) val inputObject = BoundReference(0, ScalaReflection.dataTypeFor[Array[Int]], false) val encoder = new ExpressionEncoder[Array[Int]](schema, true, ScalaReflection.serializerFor[Array[Int]](inputObject).flatten, ScalaReflection.deserializerFor[Array[Int]], ClassTag[Array[Int]](classOf[Array[Int]])) val ds2 = ds1.map(e => e)(encoder) ds1.printSchema ds2.printSchema } } root |-- value: array (nullable = true) ||-- element: integer (containsNull = false) root |-- value: array (nullable = true) // Expected (nullable = false) ||-- element: integer (containsNull = false) Kazuaki Ishizaki
Re: Spark GraphFrames
Sorry Please ignore this mail. Sorry for misinterpretation of GraphFrame in Spark. I thought that Frame Graph for profiling tool. Kazuaki Ishizaki, From: Kazuaki Ishizaki/Japan/IBM@IBMJP To: Divya Gehlot <divya.htco...@gmail.com> Cc: "user @spark" <user@spark.apache.org> Date: 2016/08/02 17:06 Subject:Re: Spark GraphFrames Hi, Kay wrote a procedure to use GraphFrames with Spark. https://gist.github.com/kayousterhout/7008a8ebf2babeedc7ce6f8723fd1bf4 Kazuaki Ishizaki From:Divya Gehlot <divya.htco...@gmail.com> To:"user @spark" <user@spark.apache.org> Date:2016/08/02 14:52 Subject:Spark GraphFrames Hi, Has anybody has worked with GraphFrames. Pls let me know as I need to know the real case scenarios where It can used . Thanks, Divya
Re: Spark GraphFrames
Hi, Kay wrote a procedure to use GraphFrames with Spark. https://gist.github.com/kayousterhout/7008a8ebf2babeedc7ce6f8723fd1bf4 Kazuaki Ishizaki From: Divya Gehlot <divya.htco...@gmail.com> To: "user @spark" <user@spark.apache.org> Date: 2016/08/02 14:52 Subject:Spark GraphFrames Hi, Has anybody has worked with GraphFrames. Pls let me know as I need to know the real case scenarios where It can used . Thanks, Divya
Re: Catalyst optimizer cpu/Io cost
Hi Yin Huai's slide is avaiable at http://www.slideshare.net/databricks/deep-dive-into-catalyst-apache-spark-20s-optimizer Kazuaki Ishizaki From: Takeshi Yamamuro <linguin@gmail.com> To: Srinivasan Hariharan02 <srinivasan_...@infosys.com> Cc: "user@spark.apache.org" <user@spark.apache.org> Date: 2016/06/10 18:09 Subject:Re: Catalyst optimizer cpu/Io cost Hi, There no way to retrieve that information in spark. In fact, the current optimizer only consider the byte size of outputs in LogicalPlan. Related code can be found in https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala#L90 If you want to know more about catalyst, you can check the Yin Huai's slide in spark summit 2016. https://spark-summit.org/2016/speakers/yin-huai/ # Note: the slide is not available now, and it seems it will be in a few weeks. // maropu On Fri, Jun 10, 2016 at 3:29 PM, Srinivasan Hariharan02 < srinivasan_...@infosys.com> wrote: Hi,, How can I get spark sql query cpu and Io cost after optimizing for the best logical plan. Is there any api to retrieve this information?. If anyone point me to the code where actually cpu and Io cost computed in catalyst module. Regards, Srinivasan Hariharan +91-9940395830 -- --- Takeshi Yamamuro