Hi Akash, such errors might appear in large spark pipelines, the root cause is a 64kb jvm limitation. the reason that your job isn't failing at the end is due to spark fallback - if code gen is failing, spark compiler will try to create the flow without the code gen (less optimized) if you do not want to see this error, you can either disable code gen using the flag: spark.sql.codegen.wholeStage= "false" or you can try to split your complex pipeline into several spark flows if possible
hope that helps Eyal On Sun, Jun 17, 2018 at 8:16 AM, Aakash Basu <[email protected]> wrote: > Hi, > > I already went through it, that's one use case. I've a complex and very > big pipeline of multiple jobs under one spark session. Not getting, on how > to solve this, as it is happening over Logistic Regression and Random > Forest models, which I'm just using from Spark ML package rather than doing > anything by myself. > > Thanks, > Aakash. > > On Sun 17 Jun, 2018, 8:21 AM vaquar khan, <[email protected]> wrote: > >> Hi Akash, >> >> Please check stackoverflow. >> >> https://stackoverflow.com/questions/41098953/codegen- >> grows-beyond-64-kb-error-when-normalizing-large-pyspark-dataframe >> >> Regards, >> Vaquar khan >> >> On Sat, Jun 16, 2018 at 3:27 PM, Aakash Basu <[email protected]> >> wrote: >> >>> Hi guys, >>> >>> I'm getting an error when I'm feature engineering on 30+ columns to >>> create about 200+ columns. It is not failing the job, but the ERROR shows. >>> I want to know how can I avoid this. >>> >>> Spark - 2.3.1 >>> Python - 3.6 >>> >>> Cluster Config - >>> 1 Master - 32 GB RAM, 16 Cores >>> 4 Slaves - 16 GB RAM, 8 Cores >>> >>> >>> Input data - 8 partitions of parquet file with snappy compression. >>> >>> My Spark-Submit -> spark-submit --master spark://192.168.60.20:7077 >>> --num-executors 4 --executor-cores 5 --executor-memory 10G --driver-cores 5 >>> --driver-memory 25G --conf spark.sql.shuffle.partitions=60 --conf >>> spark.driver.maxResultSize=2G --conf "spark.executor. >>> extraJavaOptions=-XX:+UseParallelGC" --conf >>> spark.scheduler.listenerbus.eventqueue.capacity=20000 >>> --conf spark.sql.codegen=true >>> /appdata/bblite-codebase/pipeline_data_test_run.py >>> > /appdata/bblite-data/logs/log_10_iter_pipeline_8_partitions_33_col.txt >>> >>> Stack-Trace below - >>> >>> ERROR CodeGenerator:91 - failed to compile: >>> org.codehaus.janino.InternalCompilerException: >>>> Compiling "GeneratedClass": Code of method "processNext()V" of class >>>> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$ >>>> GeneratedIteratorForCodegenStage3426" grows beyond 64 KB >>>> org.codehaus.janino.InternalCompilerException: Compiling >>>> "GeneratedClass": Code of method "processNext()V" of class >>>> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$ >>>> GeneratedIteratorForCodegenStage3426" grows beyond 64 KB >>>> at org.codehaus.janino.UnitCompiler.compileUnit( >>>> UnitCompiler.java:361) >>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:234) >>>> at org.codehaus.janino.SimpleCompiler.compileToClassLoader( >>>> SimpleCompiler.java:446) >>>> at org.codehaus.janino.ClassBodyEvaluator.compileToClass( >>>> ClassBodyEvaluator.java:313) >>>> at org.codehaus.janino.ClassBodyEvaluator.cook( >>>> ClassBodyEvaluator.java:235) >>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:204) >>>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) >>>> at org.apache.spark.sql.catalyst.expressions.codegen. >>>> CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$ >>>> CodeGenerator$$doCompile(CodeGenerator.scala:1417) >>>> at org.apache.spark.sql.catalyst.expressions.codegen. >>>> CodeGenerator$$anon$1.load(CodeGenerator.scala:1493) >>>> at org.apache.spark.sql.catalyst.expressions.codegen. >>>> CodeGenerator$$anon$1.load(CodeGenerator.scala:1490) >>>> at org.spark_project.guava.cache.LocalCache$LoadingValueReference. >>>> loadFuture(LocalCache.java:3599) >>>> at org.spark_project.guava.cache.LocalCache$Segment.loadSync( >>>> LocalCache.java:2379) >>>> at org.spark_project.guava.cache.LocalCache$Segment. >>>> lockedGetOrLoad(LocalCache.java:2342) >>>> at org.spark_project.guava.cache.LocalCache$Segment.get( >>>> LocalCache.java:2257) >>>> at org.spark_project.guava.cache.LocalCache.get(LocalCache. >>>> java:4000) >>>> at org.spark_project.guava.cache.LocalCache.getOrLoad( >>>> LocalCache.java:4004) >>>> at org.spark_project.guava.cache.LocalCache$LocalLoadingCache. >>>> get(LocalCache.java:4874) >>>> at org.apache.spark.sql.catalyst.expressions.codegen. >>>> CodeGenerator$.compile(CodeGenerator.scala:1365) >>>> at org.apache.spark.sql.execution.WholeStageCodegenExec. >>>> liftedTree1$1(WholeStageCodegenExec.scala:579) >>>> at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute( >>>> WholeStageCodegenExec.scala:578) >>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$ >>>> execute$1.apply(SparkPlan.scala:131) >>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$ >>>> execute$1.apply(SparkPlan.scala:127) >>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$ >>>> executeQuery$1.apply(SparkPlan.scala:155) >>>> at org.apache.spark.rdd.RDDOperationScope$.withScope( >>>> RDDOperationScope.scala:151) >>>> at org.apache.spark.sql.execution.SparkPlan. >>>> executeQuery(SparkPlan.scala:152) >>>> at org.apache.spark.sql.execution.SparkPlan.execute( >>>> SparkPlan.scala:127) >>>> at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec. >>>> prepareShuffleDependency(ShuffleExchangeExec.scala:92) >>>> at org.apache.spark.sql.execution.exchange. >>>> ShuffleExchangeExec$$anonfun$doExecute$1.apply( >>>> ShuffleExchangeExec.scala:128) >>>> at org.apache.spark.sql.execution.exchange. >>>> ShuffleExchangeExec$$anonfun$doExecute$1.apply( >>>> ShuffleExchangeExec.scala:119) >>>> at org.apache.spark.sql.catalyst.errors.package$.attachTree( >>>> package.scala:52) >>>> at org.apache.spark.sql.execution.exchange. >>>> ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119) >>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$ >>>> execute$1.apply(SparkPlan.scala:131) >>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$ >>>> execute$1.apply(SparkPlan.scala:127) >>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$ >>>> executeQuery$1.apply(SparkPlan.scala:155) >>>> at org.apache.spark.rdd.RDDOperationScope$.withScope( >>>> RDDOperationScope.scala:151) >>>> at org.apache.spark.sql.execution.SparkPlan. >>>> executeQuery(SparkPlan.scala:152) >>>> at org.apache.spark.sql.execution.SparkPlan.execute( >>>> SparkPlan.scala:127) >>>> at org.apache.spark.sql.execution.InputAdapter.inputRDDs( >>>> WholeStageCodegenExec.scala:371) >>>> at org.apache.spark.sql.execution.SortExec.inputRDDs( >>>> SortExec.scala:121) >>>> at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute( >>>> WholeStageCodegenExec.scala:605) >>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$ >>>> execute$1.apply(SparkPlan.scala:131) >>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$ >>>> execute$1.apply(SparkPlan.scala:127) >>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$ >>>> executeQuery$1.apply(SparkPlan.scala:155) >>>> at org.apache.spark.rdd.RDDOperationScope$.withScope( >>>> RDDOperationScope.scala:151) >>>> at org.apache.spark.sql.execution.SparkPlan. >>>> executeQuery(SparkPlan.scala:152) >>>> at org.apache.spark.sql.execution.SparkPlan.execute( >>>> SparkPlan.scala:127) >>>> at org.apache.spark.sql.execution.joins. >>>> SortMergeJoinExec.doExecute(SortMergeJoinExec.scala:150) >>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$ >>>> execute$1.apply(SparkPlan.scala:131) >>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$ >>>> execute$1.apply(SparkPlan.scala:127) >>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$ >>>> executeQuery$1.apply(SparkPlan.scala:155) >>>> at org.apache.spark.rdd.RDDOperationScope$.withScope( >>>> RDDOperationScope.scala:151) >>>> at org.apache.spark.sql.execution.SparkPlan. >>>> executeQuery(SparkPlan.scala:152) >>>> at org.apache.spark.sql.execution.SparkPlan.execute( >>>> SparkPlan.scala:127) >>>> at org.apache.spark.sql.execution.ProjectExec.doExecute( >>>> basicPhysicalOperators.scala:70) >>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$ >>>> execute$1.apply(SparkPlan.scala:131) >>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$ >>>> execute$1.apply(SparkPlan.scala:127) >>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$ >>>> executeQuery$1.apply(SparkPlan.scala:155) >>>> at org.apache.spark.rdd.RDDOperationScope$.withScope( >>>> RDDOperationScope.scala:151) >>>> at org.apache.spark.sql.execution.SparkPlan. >>>> executeQuery(SparkPlan.scala:152) >>>> at org.apache.spark.sql.execution.SparkPlan.execute( >>>> SparkPlan.scala:127) >>>> at org.apache.spark.sql.execution.joins. >>>> SortMergeJoinExec.doExecute(SortMergeJoinExec.scala:150) >>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$ >>>> execute$1.apply(SparkPlan.scala:131) >>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$ >>>> execute$1.apply(SparkPlan.scala:127) >>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$ >>>> executeQuery$1.apply(SparkPlan.scala:155) >>>> at org.apache.spark.rdd.RDDOperationScope$.withScope( >>>> RDDOperationScope.scala:151) >>>> at org.apache.spark.sql.execution.SparkPlan. >>>> executeQuery(SparkPlan.scala:152) >>>> at org.apache.spark.sql.execution.SparkPlan.execute( >>>> SparkPlan.scala:127) >>>> at org.apache.spark.sql.execution.ProjectExec.doExecute( >>>> basicPhysicalOperators.scala:70) >>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$ >>>> execute$1.apply(SparkPlan.scala:131) >>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$ >>>> execute$1.apply(SparkPlan.scala:127) >>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$ >>>> executeQuery$1.apply(SparkPlan.scala:155) >>>> at org.apache.spark.rdd.RDDOperationScope$.withScope( >>>> RDDOperationScope.scala:151) >>>> at org.apache.spark.sql.execution.SparkPlan. >>>> executeQuery(SparkPlan.scala:152) >>>> at org.apache.spark.sql.execution.SparkPlan.execute( >>>> SparkPlan.scala:127) >>>> at org.apache.spark.sql.execution.columnar. >>>> InMemoryRelation.buildBuffers(InMemoryRelation.scala:107) >>>> at org.apache.spark.sql.execution.columnar.InMemoryRelation.<init>( >>>> InMemoryRelation.scala:102) >>>> at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply( >>>> InMemoryRelation.scala:43) >>>> at org.apache.spark.sql.execution.CacheManager$$ >>>> anonfun$cacheQuery$1.apply(CacheManager.scala:97) >>>> at org.apache.spark.sql.execution.CacheManager. >>>> writeLock(CacheManager.scala:67) >>>> at org.apache.spark.sql.execution.CacheManager. >>>> cacheQuery(CacheManager.scala:91) >>>> at org.apache.spark.sql.Dataset.persist(Dataset.scala:2924) >>>> at sun.reflect.GeneratedMethodAccessor78.invoke(Unknown Source) >>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke( >>>> DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >>>> at py4j.reflection.ReflectionEngine.invoke( >>>> ReflectionEngine.java:357) >>>> at py4j.Gateway.invoke(Gateway.java:282) >>>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand. >>>> java:132) >>>> at py4j.commands.CallCommand.execute(CallCommand.java:79) >>>> at py4j.GatewayConnection.run(GatewayConnection.java:238) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: org.codehaus.janino.InternalCompilerException: Code of >>>> method "processNext()V" of class "org.apache.spark.sql. >>>> catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3426" >>>> grows beyond 64 KB >>>> >>> >>> Thanks, >>> Aakash. >>> >> >> >> >> -- >> Regards, >> Vaquar Khan >> +1 -224-436-0783 >> Greater Chicago >> >
