[ https://issues.apache.org/jira/browse/SPARK-22702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen resolved SPARK-22702. ------------------------------- Resolution: Invalid This should start on the mailing list. I must say I'm not clear what you're reporting. What's being printed? why do you not expect this output? is there a simpler reproduction? > Spark sql filter with size function(if exists) leads twice calculation > ---------------------------------------------------------------------- > > Key: SPARK-22702 > URL: https://issues.apache.org/jira/browse/SPARK-22702 > Project: Spark > Issue Type: Question > Components: SQL > Affects Versions: 2.1.0 > Environment: jdk1.7.0_67 > spark-hive_2.11 > Reporter: chenfh5 > Priority: Minor > > I occur an issue about spark-sql. When obtaining a Dataset through some > logics, I wish to persist this Dataset as it would be used many times in the > future. However, when persisting it, the logic would be calculated twice. > Therefore I make some local test to reproduce this issue, and it happens. > I test in three filter function, and found that, > .filter(col("id") > 10) //expect > .filter(length(col("name")) > 4) //expect > .filter(size(col("seq_name")) > 1) //unexpect if filter exist > i.e., the twice calculation issue occurs when filter out result exists. > h5. result image > h6. expected > !http://upload-images.jianshu.io/upload_images/2189341-51ee89377e7159ac.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240! > h6. unexpected > !http://upload-images.jianshu.io/upload_images/2189341-f87ee2047f0cf120.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240! > h5. reproduce code > {code:scala} > import org.apache.hadoop.fs.{FileSystem, Path} > import org.apache.spark.SparkConf > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.types._ > import org.apache.spark.sql.{Dataset, Row, SparkSession} > object TwiceCalculationReproducer { > private val separator = scala.reflect.io.File.separator > private val dirName = new java.io.File("").getAbsolutePath + separator + > "testOutput" > System.setProperty("spark.app.name", "TestController") > System.setProperty("spark.master", "local[2]") > private val ss = SparkSession.builder().config(new > SparkConf()).enableHiveSupport().getOrCreate() > ss.sparkContext.hadoopConfiguration.set("fs.defaultFS", "file:///") > private val sc = ss.sparkContext > def main(args: Array[String]) { > val fs = FileSystem.get(sc.hadoopConfiguration) > fs.delete(new Path(dirName), true) > Thread.sleep(1000) > /*expected*/ > val tableRaw = Dims.df > val tableNewExp = seqColumnGeneratorExcepted(tableRaw) > tableNewExp.persist() > tableNewExp.show(10, 100) > /*unexpected*/ > Thread.sleep(1000) > val tableNewUnexp = seqColumnGeneratorUnexpected(tableRaw) > tableNewExp.persist() > tableNewUnexp.show(10, 100) > } > /*normal*/ > def seqColumnGeneratorExcepted[T](ds: Dataset[T]) = { > ds.withColumn("seq_name", seqTokenUdf(col("id"), col("name"))) > } > /*abnormal*/ > def seqColumnGeneratorUnexpected[T](ds: Dataset[T]) = { > seqColumnGeneratorExcepted(ds) > .filter(col("id") > 10) //expect > .filter(length(col("name")) > 4) //expect > .filter(size(col("seq_name")) > 1) //unexpect if filter exist > } > /*validator udf*/ > def seqTokenUdf = udf { > (id: Int, name: String) => { > /*validator 1: console print*/ > println(name + "_" + id + "_" + System.currentTimeMillis()) > /*validator 2: write file in case of executor not printing console*/ > val fs = FileSystem.get(sc.hadoopConfiguration) > fs.setWriteChecksum(false) > val fileName = Seq(dirName, name + "_" + > System.currentTimeMillis.toString) mkString separator > fs.create(new Path(fileName)) > /*return*/ > Seq[String](name, System.currentTimeMillis().toString) > } > } > /*test data mock*/ > object Dims { > private val structTypes = StructType(Seq( > StructField("id", IntegerType), > StructField("name", StringType) > )) > private val data = List( > Row(100, "file100"), > Row(101, "file101"), > Row(102, "file102") > ) > private val rdd = sc.parallelize(data) > val df = ss.createDataFrame(rdd, structTypes) > } > } > {code} > Is there any problem in the size() function? > Kind regards, > chenfh5 -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org