[GitHub] [spark] zhengruifeng commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations
zhengruifeng commented on PR #40263: URL: https://github.com/apache/spark/pull/40263#issuecomment-1478764552 @srowen sounds reasonable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations
zhengruifeng commented on PR #40263: URL: https://github.com/apache/spark/pull/40263#issuecomment-1477193966 TL;DR I want to apply scalar subquery to optimize `FPGrowthModel.transform`, there are two options: 1, create temp views and use `spark.sql`, see https://github.com/apache/spark/commit/63595ba03d9f18fe0b43bfb09f974ea50cb2c651; 2, add `private[spark] def withScalarSubquery(colName: String, subquery: Dataset[_]): DataFrame`, it seems much more convenient but not sure whether it is a proper way. cc @cloud-fan @HyukjinKwon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations
zhengruifeng commented on PR #40263: URL: https://github.com/apache/spark/pull/40263#issuecomment-1473064231 @srowen if the latest performance test seems fine, then I'd ask the SQL guys whether we can have a subquery method in DataFrame APIs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations
zhengruifeng commented on PR #40263: URL: https://github.com/apache/spark/pull/40263#issuecomment-1467821846 yes, the `BroadcastNestedLoopJoin` is slower. Then I have a try with subquery, and it's faster in both execution and analysis, but I have to create temp view and write the sql query then, see https://github.com/apache/spark/pull/40263/commits/63595ba03d9f18fe0b43bfb09f974ea50cb2c651 `model.transform(df).count()`: 4566 -> 4902 `model.transform(df).schema`: 2095 -> 298 So I'm trying to add a new method `Dataset.withScalarSubquery` in https://github.com/apache/spark/pull/40263/commits/c41ac094eb40520948d95108a78431694a33772d not sure whether it is the correct way to support `ScalarSubquery` in DataFrame APIs, but it is actually a pain point to me. ``` scala> model.transform(df).explain("extended") == Parsed Logical Plan == 'Project [items#5, CASE WHEN NOT isnull('items) THEN aggregate('prediction, cast(array() as array), lambdafunction(CASE WHEN forall(lambda 'y_1[antecedent], lambdafunction(array_contains('items, lambda 'x_2), lambda 'x_2, false)) THEN array_union(lambda 'x_0, array_except(lambda 'y_1[consequent], 'items)) ELSE lambda 'x_0 END, lambda 'x_0, lambda 'y_1, false), lambdafunction(lambda 'x_3, lambda 'x_3, false)) ELSE cast(array() as array) END AS prediction#74] +- Project [items#5, scalar-subquery#70 [] AS prediction#71] : +- Aggregate [collect_list(struct(antecedent, antecedent#57, consequent, consequent#58), 0, 0) AS collect_list(struct(antecedent, consequent))#68] : +- Filter (NOT isnull(antecedent#57) AND NOT isnull(consequent#58)) :+- LogicalRDD [antecedent#57, consequent#58, confidence#59, lift#60, support#61], false +- Project [value#2 AS items#5] +- SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2] +- ExternalRDD [obj#1] == Analyzed Logical Plan == items: array, prediction: array Project [items#5, CASE WHEN NOT isnull(items#5) THEN aggregate(prediction#71, cast(array() as array), lambdafunction(CASE WHEN forall(lambda y_1#76.antecedent, lambdafunction(array_contains(items#5, lambda x_2#78), lambda x_2#78, false)) THEN array_union(lambda x_0#75, array_except(lambda y_1#76.consequent, items#5)) ELSE lambda x_0#75 END, lambda x_0#75, lambda y_1#76, false), lambdafunction(lambda x_3#77, lambda x_3#77, false)) ELSE cast(array() as array) END AS prediction#74] +- Project [items#5, scalar-subquery#70 [] AS prediction#71] : +- Aggregate [collect_list(struct(antecedent, antecedent#57, consequent, consequent#58), 0, 0) AS collect_list(struct(antecedent, consequent))#68] : +- Filter (NOT isnull(antecedent#57) AND NOT isnull(consequent#58)) :+- LogicalRDD [antecedent#57, consequent#58, confidence#59, lift#60, support#61], false +- Project [value#2 AS items#5] +- SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2] +- ExternalRDD [obj#1] == Optimized Logical Plan == Project [items#5, CASE WHEN isnotnull(items#5) THEN aggregate(scalar-subquery#70 [], [], lambdafunction(CASE WHEN forall(lambda y_1#76.antecedent, lambdafunction(array_contains(items#5, lambda x_2#78), lambda x_2#78, false)) THEN array_union(lambda x_0#75, array_except(lambda y_1#76.consequent, items#5)) ELSE lambda x_0#75 END, lambda x_0#75, lambda y_1#76, false), lambdafunction(lambda x_3#77, lambda x_3#77, false)) ELSE [] END AS prediction#74] : +- Aggregate [collect_list(struct(antecedent, antecedent#57, consequent, consequent#58), 0, 0) AS collect_list(struct(antecedent, consequent))#68] : +- Project [antecedent#57, consequent#58] :+- LogicalRDD [antecedent#57, consequent#58, confidence#59, lift#60, support#61], false +- InMemoryRelation [items#5], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [value#2 AS items#5] +- *(1) SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), StringType,
[GitHub] [spark] zhengruifeng commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations
zhengruifeng commented on PR #40263: URL: https://github.com/apache/spark/pull/40263#issuecomment-1465662299 I did a quick test with dataset `T10I4D100K` in http://fimi.uantwerpen.be/data/ fit: ``` scala> val df = sc.textFile("/Users/ruifeng.zheng/.dev/data/T10I4D100K.dat").map(_.split(" ")).toDF("items") df: org.apache.spark.sql.DataFrame = [items: array] scala> df.count res16: Long = 10 scala> val model = new FPGrowth().setMinSupport(0.01).setMinConfidence(0.01).fit(df) model: org.apache.spark.ml.fpm.FPGrowthModel = FPGrowthModel: uid=fpgrowth_92901252345a, numTrainingRecords=10 scala> model.freqItemsets.count res17: Long = 385 scala> model.associationRules.count res18: Long = 21 scala> model.save("/tmp/fpm.model") ``` transformation: ``` import org.apache.spark.ml.fpm._ val df = sc.textFile("/Users/ruifeng.zheng/.dev/data/T10I4D100K.dat").map(_.split(" ")).toDF("items") df.cache() df.count() val model = FPGrowthModel.load("/tmp/fpm.model") model.transform(df).explain("extended") Seq.range(0, 100).foreach{i => model.transform(df).count()} // warms up val start = System.currentTimeMillis; Seq.range(0, 100).foreach{i => model.transform(df).count()}; val end = System.currentTimeMillis; end - start val start = System.currentTimeMillis; Seq.range(0, 100).foreach{i => model.transform(df).schema}; val end = System.currentTimeMillis; end - start ``` master: ``` master ``` scala> val model = FPGrowthModel.load("/tmp/fpm.model") model: org.apache.spark.ml.fpm.FPGrowthModel = FPGrowthModel: uid=fpgrowth_92901252345a, numTrainingRecords=10 scala> model.transform(df).explain("extended") == Parsed Logical Plan == 'Project [items#5, UDF('items) AS prediction#70] +- Project [value#2 AS items#5] +- SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2] +- ExternalRDD [obj#1] == Analyzed Logical Plan == items: array, prediction: array Project [items#5, UDF(items#5) AS prediction#70] +- Project [value#2 AS items#5] +- SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2] +- ExternalRDD [obj#1] == Optimized Logical Plan == Project [items#5, UDF(items#5) AS prediction#70] +- InMemoryRelation [items#5], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [value#2 AS items#5] +- *(1) SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2] +- Scan[obj#1] == Physical Plan == *(1) Project [items#5, UDF(items#5) AS prediction#70] +- InMemoryTableScan [items#5] +- InMemoryRelation [items#5], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [value#2 AS items#5] +- *(1) SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2] +- Scan[obj#1] scala> Seq.range(0, 100).foreach{i => model.transform(df).count()} // warms up scala> val start = System.currentTimeMillis; Seq.range(0, 100).foreach{i => model.transform(df).count()}; val end = System.currentTimeMillis; end - start start: Long = 1678692855532 end: Long = 1678692860098 res4: Long = 4566 scala> val start = System.currentTimeMillis; Seq.range(0, 100).foreach{i => model.transform(df).schema}; val end = System.currentTimeMillis; end -