Re: Problem using User Defined Predicate pushdown with core RDD and parquet - UDP class not found
+spark user mailing list Hi there, I have exactly the same problem as mentioned below. My current work around is to add the jar containing my UDP in one of the system classpath (for example, put it under the same path as /opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/parquet-hadoop-bundle-1.5.0-cdh5.4.2.jar) listed in "Classpath Entries" of spark executors. Obviously, the downside is that you have to put the jar locally to every node of the cluster and it's hard to maintain when the cluster's setup got updated. I'd like to hear if anyone has a better solution for this. Thanks a lot! > > > -- Forwarded message -- > From: Vladimir Vladimirov> To: dev@spark.apache.org > Cc: > Date: Mon, 19 Oct 2015 19:38:07 -0400 > Subject: Problem using User Defined Predicate pushdown with core RDD and > parquet - UDP class not found > Hi all > > I feel like this questions is more Spark dev related that Spark user > related. Please correct me if I'm wrong. > > My project's data flow involves sampling records from the data stored as > Parquet dataset. > I've checked DataFrames API and it doesn't support user defined predicates > projection pushdown - only simple filter expressions. > I want to use custom filter function predicate pushdown feature of parquet > while loading data with newAPIHadoopFile. > Simple filters constructed with org.apache.parquet.filter2 API works fine. > But User Defined Predicate works only with `--master local` mode. > > When I try to run in yarn-client mode my test program that uses UDP class > to be used by parquet-mr I'm getting class not found exception. > > I suspect that the issue could be related to the way how class loader > works from parquet or maybe it could be related to the fact that Spark > executor processes has my jar loaded from HTTP server and there is some > security policies (classpath shows that the jar URI is actually HTTP URL > and not local file). > > I've tried to create uber jar with all dependencies and shipt it with the > spark app - no success. > > PS I'm using spark 1.5.1. > > Here is my command line I'm using to submit the application: > > SPARK_CLASSPATH=./lib/my-jar-with-dependencies.jar spark-submit \ > --master yarn-client > --num-executors 3 --driver-memory 3G --executor-memory 2G \ > --executor-cores 1 \ > --jars > ./lib/my-jar-with-dependencies.jar,./lib/snappy-java-1.1.2.jar,./lib/parquet-hadoop-1.7.0.jar,./lib/parquet-avro-1.7.0.jar,./lib/parquet-column-1.7.0.jar,/opt/cloudera/parcels/CDH/jars/avro-1.7.6-cdh5.4.0.jar,/opt/cloudera/parcels/CDH/jars/avro-mapred-1.7.6-cdh5.4.0-hadoop2.jar, > \ > --class my.app.parquet.filters.tools.TestSparkApp \ > ./lib/my-jar-with-dependencies.jar \ > yarn-client \ > "/user/vvlad/2015/*/*/*/EVENTS" > > Here is the code of my UDP class: > > package my.app.parquet.filters.udp > > import org.apache.parquet.filter2.predicate.Statistics > import org.apache.parquet.filter2.predicate.UserDefinedPredicate > > > import java.lang.{Integer => JInt} > > import scala.util.Random > > class SampleIntColumn(threshold: Double) extends > UserDefinedPredicate[JInt] with Serializable { > lazy val random = { new Random() } > val myThreshold = threshold > override def keep(value: JInt): Boolean = { > random.nextFloat() < myThreshold > } > > override def canDrop(statistics: Statistics[JInt]): Boolean = false > > override def inverseCanDrop(statistics: Statistics[JInt]): Boolean = > false > > override def toString: String = { > "%s(%f)".format(getClass.getName, myThreshold) > } > } > > Spark app: > > package my.app.parquet.filters.tools > > import my.app.parquet.filters.udp.SampleIntColumn > import org.apache.avro.generic.GenericRecord > import org.apache.hadoop.mapreduce.Job > import org.apache.parquet.avro.AvroReadSupport > import org.apache.parquet.filter2.dsl.Dsl.IntColumn > import org.apache.parquet.hadoop.ParquetInputFormat > import org.apache.spark.{SparkContext, SparkConf} > > import org.apache.parquet.filter2.dsl.Dsl._ > import org.apache.parquet.filter2.predicate.FilterPredicate > > > object TestSparkApp { > def main (args: Array[String]) { > val conf = new SparkConf() > //"local[2]" or yarn-client etc > .setMaster(args(0)) > .setAppName("Spark Scala App") > .set("spark.executor.memory", "1g") > .set("spark.rdd.compress", "true") > .set("spark.storage.memoryFraction", "1") > > val sc = new SparkContext(conf) > > val job = new Job(sc.hadoopConfiguration) > ParquetInputFormat.setReadSupportClass(job, > classOf[AvroReadSupport[GenericRecord]]) > > val sampler = new SampleIntColumn(0.05) > val impField = IntColumn("impression") > > val pred: FilterPredicate = impField.filterBy(sampler) > > ParquetInputFormat.setFilterPredicate(job.getConfiguration, pred) > > > println(job.getConfiguration.get("parquet.private.read.filter.predicate")) > >
Problem using User Defined Predicate pushdown with core RDD and parquet - UDP class not found
Hi all I feel like this questions is more Spark dev related that Spark user related. Please correct me if I'm wrong. My project's data flow involves sampling records from the data stored as Parquet dataset. I've checked DataFrames API and it doesn't support user defined predicates projection pushdown - only simple filter expressions. I want to use custom filter function predicate pushdown feature of parquet while loading data with newAPIHadoopFile. Simple filters constructed with org.apache.parquet.filter2 API works fine. But User Defined Predicate works only with `--master local` mode. When I try to run in yarn-client mode my test program that uses UDP class to be used by parquet-mr I'm getting class not found exception. I suspect that the issue could be related to the way how class loader works from parquet or maybe it could be related to the fact that Spark executor processes has my jar loaded from HTTP server and there is some security policies (classpath shows that the jar URI is actually HTTP URL and not local file). I've tried to create uber jar with all dependencies and shipt it with the spark app - no success. PS I'm using spark 1.5.1. Here is my command line I'm using to submit the application: SPARK_CLASSPATH=./lib/my-jar-with-dependencies.jar spark-submit \ --master yarn-client --num-executors 3 --driver-memory 3G --executor-memory 2G \ --executor-cores 1 \ --jars ./lib/my-jar-with-dependencies.jar,./lib/snappy-java-1.1.2.jar,./lib/parquet-hadoop-1.7.0.jar,./lib/parquet-avro-1.7.0.jar,./lib/parquet-column-1.7.0.jar,/opt/cloudera/parcels/CDH/jars/avro-1.7.6-cdh5.4.0.jar,/opt/cloudera/parcels/CDH/jars/avro-mapred-1.7.6-cdh5.4.0-hadoop2.jar, \ --class my.app.parquet.filters.tools.TestSparkApp \ ./lib/my-jar-with-dependencies.jar \ yarn-client \ "/user/vvlad/2015/*/*/*/EVENTS" Here is the code of my UDP class: package my.app.parquet.filters.udp import org.apache.parquet.filter2.predicate.Statistics import org.apache.parquet.filter2.predicate.UserDefinedPredicate import java.lang.{Integer => JInt} import scala.util.Random class SampleIntColumn(threshold: Double) extends UserDefinedPredicate[JInt] with Serializable { lazy val random = { new Random() } val myThreshold = threshold override def keep(value: JInt): Boolean = { random.nextFloat() < myThreshold } override def canDrop(statistics: Statistics[JInt]): Boolean = false override def inverseCanDrop(statistics: Statistics[JInt]): Boolean = false override def toString: String = { "%s(%f)".format(getClass.getName, myThreshold) } } Spark app: package my.app.parquet.filters.tools import my.app.parquet.filters.udp.SampleIntColumn import org.apache.avro.generic.GenericRecord import org.apache.hadoop.mapreduce.Job import org.apache.parquet.avro.AvroReadSupport import org.apache.parquet.filter2.dsl.Dsl.IntColumn import org.apache.parquet.hadoop.ParquetInputFormat import org.apache.spark.{SparkContext, SparkConf} import org.apache.parquet.filter2.dsl.Dsl._ import org.apache.parquet.filter2.predicate.FilterPredicate object TestSparkApp { def main (args: Array[String]) { val conf = new SparkConf() //"local[2]" or yarn-client etc .setMaster(args(0)) .setAppName("Spark Scala App") .set("spark.executor.memory", "1g") .set("spark.rdd.compress", "true") .set("spark.storage.memoryFraction", "1") val sc = new SparkContext(conf) val job = new Job(sc.hadoopConfiguration) ParquetInputFormat.setReadSupportClass(job, classOf[AvroReadSupport[GenericRecord]]) val sampler = new SampleIntColumn(0.05) val impField = IntColumn("impression") val pred: FilterPredicate = impField.filterBy(sampler) ParquetInputFormat.setFilterPredicate(job.getConfiguration, pred) println(job.getConfiguration.get("parquet.private.read.filter.predicate")) println(job.getConfiguration.get("parquet.private.read.filter.predicate.human.readable")) val records1 = sc.newAPIHadoopFile( // args(1), classOf[ParquetInputFormat[GenericRecord]], classOf[Void], classOf[GenericRecord], job.getConfiguration ).map(_._2).cache() println("result count " + records1.count().toString) sc.stop() } } Here are logs with exception I'm getting: 15/10/19 11:14:43 INFO TaskSetManager: Starting task 21.0 in stage 0.0 (TID 0, hdp010, NODE_LOCAL, 2815 bytes) 15/10/19 11:14:43 INFO TaskSetManager: Starting task 14.0 in stage 0.0 (TID 1, hdp042, NODE_LOCAL, 2816 bytes) 15/10/19 11:14:43 INFO YarnClientSchedulerBackend: Registered executor: AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@hdp027:43593/user/Executor#-832887318]) with ID 3 15/10/19 11:14:43 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 2, hdp027, NODE_LOCAL, 2814 bytes) 15/10/19 11:14:44 INFO BlockManagerMasterEndpoint: Registering block manager hdp027:64266 with 883.8 MB RAM, BlockManagerId(3,