[GitHub] [spark] zhengruifeng commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations

2023-03-21 Thread via GitHub


zhengruifeng commented on PR #40263:
URL: https://github.com/apache/spark/pull/40263#issuecomment-1478764552

   @srowen sounds reasonable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations

2023-03-20 Thread via GitHub


zhengruifeng commented on PR #40263:
URL: https://github.com/apache/spark/pull/40263#issuecomment-1477193966

   TL;DR  I want to apply scalar subquery to optimize 
`FPGrowthModel.transform`, there are two options:
   
   1, create temp views and use `spark.sql`, see 
https://github.com/apache/spark/commit/63595ba03d9f18fe0b43bfb09f974ea50cb2c651;
   
   2, add `private[spark] def withScalarSubquery(colName: String, subquery: 
Dataset[_]): DataFrame`, it seems much more convenient but not sure whether it 
is a proper way.
   
   cc @cloud-fan @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations

2023-03-16 Thread via GitHub


zhengruifeng commented on PR #40263:
URL: https://github.com/apache/spark/pull/40263#issuecomment-1473064231

   @srowen if the latest performance test seems fine, then I'd ask the SQL guys 
whether we can have a subquery method in DataFrame APIs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations

2023-03-14 Thread via GitHub


zhengruifeng commented on PR #40263:
URL: https://github.com/apache/spark/pull/40263#issuecomment-1467821846

   yes, the `BroadcastNestedLoopJoin` is slower.
   
   Then I have a try with subquery, and it's faster in both execution and 
analysis, but I have to create temp view and write the sql query then, see 
https://github.com/apache/spark/pull/40263/commits/63595ba03d9f18fe0b43bfb09f974ea50cb2c651
   
   `model.transform(df).count()`: 4566 -> 4902
   `model.transform(df).schema`: 2095 -> 298
   
   So I'm trying to add a new method `Dataset.withScalarSubquery` in 
https://github.com/apache/spark/pull/40263/commits/c41ac094eb40520948d95108a78431694a33772d
 
   
   not sure whether it is the correct way to support `ScalarSubquery` in 
DataFrame APIs, but it is actually a pain point to me.
   
   ```
   scala> model.transform(df).explain("extended")
   == Parsed Logical Plan ==
   'Project [items#5, CASE WHEN NOT isnull('items) THEN aggregate('prediction, 
cast(array() as array), lambdafunction(CASE WHEN forall(lambda 
'y_1[antecedent], lambdafunction(array_contains('items, lambda 'x_2), lambda 
'x_2, false)) THEN array_union(lambda 'x_0, array_except(lambda 
'y_1[consequent], 'items)) ELSE lambda 'x_0 END, lambda 'x_0, lambda 'y_1, 
false), lambdafunction(lambda 'x_3, lambda 'x_3, false)) ELSE cast(array() as 
array) END AS prediction#74]
   +- Project [items#5, scalar-subquery#70 [] AS prediction#71]
  :  +- Aggregate [collect_list(struct(antecedent, antecedent#57, 
consequent, consequent#58), 0, 0) AS collect_list(struct(antecedent, 
consequent))#68]
  : +- Filter (NOT isnull(antecedent#57) AND NOT isnull(consequent#58))
  :+- LogicalRDD [antecedent#57, consequent#58, confidence#59, 
lift#60, support#61], false
  +- Project [value#2 AS items#5]
 +- SerializeFromObject [mapobjects(lambdavariable(MapObject, 
ObjectType(class java.lang.Object), true, 1), staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
validateexternaltype(lambdavariable(MapObject, ObjectType(class 
java.lang.Object), true, 1), StringType, ObjectType(class java.lang.String)), 
true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
+- ExternalRDD [obj#1]
   
   == Analyzed Logical Plan ==
   items: array, prediction: array
   Project [items#5, CASE WHEN NOT isnull(items#5) THEN 
aggregate(prediction#71, cast(array() as array), lambdafunction(CASE 
WHEN forall(lambda y_1#76.antecedent, lambdafunction(array_contains(items#5, 
lambda x_2#78), lambda x_2#78, false)) THEN array_union(lambda x_0#75, 
array_except(lambda y_1#76.consequent, items#5)) ELSE lambda x_0#75 END, lambda 
x_0#75, lambda y_1#76, false), lambdafunction(lambda x_3#77, lambda x_3#77, 
false)) ELSE cast(array() as array) END AS prediction#74]
   +- Project [items#5, scalar-subquery#70 [] AS prediction#71]
  :  +- Aggregate [collect_list(struct(antecedent, antecedent#57, 
consequent, consequent#58), 0, 0) AS collect_list(struct(antecedent, 
consequent))#68]
  : +- Filter (NOT isnull(antecedent#57) AND NOT isnull(consequent#58))
  :+- LogicalRDD [antecedent#57, consequent#58, confidence#59, 
lift#60, support#61], false
  +- Project [value#2 AS items#5]
 +- SerializeFromObject [mapobjects(lambdavariable(MapObject, 
ObjectType(class java.lang.Object), true, 1), staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
validateexternaltype(lambdavariable(MapObject, ObjectType(class 
java.lang.Object), true, 1), StringType, ObjectType(class java.lang.String)), 
true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
+- ExternalRDD [obj#1]
   
   == Optimized Logical Plan ==
   Project [items#5, CASE WHEN isnotnull(items#5) THEN 
aggregate(scalar-subquery#70 [], [], lambdafunction(CASE WHEN forall(lambda 
y_1#76.antecedent, lambdafunction(array_contains(items#5, lambda x_2#78), 
lambda x_2#78, false)) THEN array_union(lambda x_0#75, array_except(lambda 
y_1#76.consequent, items#5)) ELSE lambda x_0#75 END, lambda x_0#75, lambda 
y_1#76, false), lambdafunction(lambda x_3#77, lambda x_3#77, false)) ELSE [] 
END AS prediction#74]
   :  +- Aggregate [collect_list(struct(antecedent, antecedent#57, consequent, 
consequent#58), 0, 0) AS collect_list(struct(antecedent, consequent))#68]
   : +- Project [antecedent#57, consequent#58]
   :+- LogicalRDD [antecedent#57, consequent#58, confidence#59, 
lift#60, support#61], false
   +- InMemoryRelation [items#5], StorageLevel(disk, memory, deserialized, 1 
replicas)
 +- *(1) Project [value#2 AS items#5]
+- *(1) SerializeFromObject [mapobjects(lambdavariable(MapObject, 
ObjectType(class java.lang.Object), true, -1), staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
validateexternaltype(lambdavariable(MapObject, ObjectType(class 
java.lang.Object), true, -1), StringType, ObjectType

[GitHub] [spark] zhengruifeng commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations

2023-03-13 Thread via GitHub


zhengruifeng commented on PR #40263:
URL: https://github.com/apache/spark/pull/40263#issuecomment-1465662299

   I did a quick test with dataset `T10I4D100K` in 
http://fimi.uantwerpen.be/data/ 
   
   fit:
   ```
   scala> val df = 
sc.textFile("/Users/ruifeng.zheng/.dev/data/T10I4D100K.dat").map(_.split(" 
")).toDF("items")
   df: org.apache.spark.sql.DataFrame = [items: array]
   
   scala> df.count
   res16: Long = 10
   
   scala> val model = new 
FPGrowth().setMinSupport(0.01).setMinConfidence(0.01).fit(df)
   model: org.apache.spark.ml.fpm.FPGrowthModel = FPGrowthModel: 
uid=fpgrowth_92901252345a, numTrainingRecords=10
   
   scala> model.freqItemsets.count
   res17: Long = 385
   
   
   scala> model.associationRules.count
   res18: Long = 21 
   
   
   scala> model.save("/tmp/fpm.model")
   ```
   
   
   transformation:
   ```
   import org.apache.spark.ml.fpm._
   val df = 
sc.textFile("/Users/ruifeng.zheng/.dev/data/T10I4D100K.dat").map(_.split(" 
")).toDF("items")
   df.cache()
   df.count()
   
   val model = FPGrowthModel.load("/tmp/fpm.model")
   model.transform(df).explain("extended")
   Seq.range(0, 100).foreach{i => model.transform(df).count()} // warms up
   val start = System.currentTimeMillis; Seq.range(0, 100).foreach{i => 
model.transform(df).count()}; val end = System.currentTimeMillis; end - start
   val start = System.currentTimeMillis; Seq.range(0, 100).foreach{i => 
model.transform(df).schema}; val end = System.currentTimeMillis; end - start
   ```
   
   master:
   ```
   master
   ```
   scala> val model = FPGrowthModel.load("/tmp/fpm.model")
   model: org.apache.spark.ml.fpm.FPGrowthModel = FPGrowthModel: 
uid=fpgrowth_92901252345a, numTrainingRecords=10
   
   scala> model.transform(df).explain("extended")
   == Parsed Logical Plan ==
   'Project [items#5, UDF('items) AS prediction#70]
   +- Project [value#2 AS items#5]
  +- SerializeFromObject [mapobjects(lambdavariable(MapObject, 
ObjectType(class java.lang.Object), true, 1), staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
validateexternaltype(lambdavariable(MapObject, ObjectType(class 
java.lang.Object), true, 1), StringType, ObjectType(class java.lang.String)), 
true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
 +- ExternalRDD [obj#1]
   
   == Analyzed Logical Plan ==
   items: array, prediction: array
   Project [items#5, UDF(items#5) AS prediction#70]
   +- Project [value#2 AS items#5]
  +- SerializeFromObject [mapobjects(lambdavariable(MapObject, 
ObjectType(class java.lang.Object), true, 1), staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
validateexternaltype(lambdavariable(MapObject, ObjectType(class 
java.lang.Object), true, 1), StringType, ObjectType(class java.lang.String)), 
true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
 +- ExternalRDD [obj#1]
   
   == Optimized Logical Plan ==
   Project [items#5, UDF(items#5) AS prediction#70]
   +- InMemoryRelation [items#5], StorageLevel(disk, memory, deserialized, 1 
replicas)
 +- *(1) Project [value#2 AS items#5]
+- *(1) SerializeFromObject [mapobjects(lambdavariable(MapObject, 
ObjectType(class java.lang.Object), true, -1), staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
validateexternaltype(lambdavariable(MapObject, ObjectType(class 
java.lang.Object), true, -1), StringType, ObjectType(class java.lang.String)), 
true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
   +- Scan[obj#1]
   
   == Physical Plan ==
   *(1) Project [items#5, UDF(items#5) AS prediction#70]
   +- InMemoryTableScan [items#5]
 +- InMemoryRelation [items#5], StorageLevel(disk, memory, 
deserialized, 1 replicas)
   +- *(1) Project [value#2 AS items#5]
  +- *(1) SerializeFromObject 
[mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 
-1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class 
java.lang.Object), true, -1), StringType, ObjectType(class java.lang.String)), 
true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
 +- Scan[obj#1]
   
   
   scala> Seq.range(0, 100).foreach{i => model.transform(df).count()} // warms 
up
   
   scala> val start = System.currentTimeMillis; Seq.range(0, 100).foreach{i => 
model.transform(df).count()}; val end = System.currentTimeMillis; end - start
   start: Long = 1678692855532
   end: Long = 1678692860098
   res4: Long = 4566
   
   scala> val start = System.currentTimeMillis; Seq.range(0, 100).foreach{i => 
model.transform(df).schema}; val end = System.currentTimeMillis; end - start