[jira] [Reopened] (SPARK-48463) MLLib function unable to handle nested data

2024-06-14 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reopened SPARK-48463:

  Assignee: Weichen Xu

> MLLib function unable to handle nested data
> ---
>
> Key: SPARK-48463
> URL: https://issues.apache.org/jira/browse/SPARK-48463
> Project: Spark
>  Issue Type: Bug
>  Components: ML, MLlib
>Affects Versions: 3.5.1
>Reporter: Chhavi Bansal
>Assignee: Weichen Xu
>Priority: Major
>  Labels: ML, MLPipelines, mllib, nested
>
> I am trying to use feature transformer on nested data after flattening, but 
> it fails.
>  
> {code:java}
> val structureData = Seq(
>   Row(Row(10, 12), 1000),
>   Row(Row(12, 14), 4300),
>   Row( Row(37, 891), 1400),
>   Row(Row(8902, 12), 4000),
>   Row(Row(12, 89), 1000)
> )
> val structureSchema = new StructType()
>   .add("location", new StructType()
> .add("longitude", IntegerType)
> .add("latitude", IntegerType))
>   .add("salary", IntegerType) 
> val df = spark.createDataFrame(spark.sparkContext.parallelize(structureData), 
> structureSchema) 
> def flattenSchema(schema: StructType, prefix: String = null, prefixSelect: 
> String = null):
> Array[Column] = {
>   schema.fields.flatMap(f => {
> val colName = if (prefix == null) f.name else (prefix + "." + f.name)
> val colnameSelect = if (prefix == null) f.name else (prefixSelect + "." + 
> f.name)
> f.dataType match {
>   case st: StructType => flattenSchema(st, colName, colnameSelect)
>   case _ =>
> Array(col(colName).as(colnameSelect))
> }
>   })
> }
> val flattenColumns = flattenSchema(df.schema)
> val flattenedDf = df.select(flattenColumns: _*){code}
> Now using the string indexer on the DOT notation.
>  
> {code:java}
> val si = new 
> StringIndexer().setInputCol("location.longitude").setOutputCol("longitutdee")
> val pipeline = new Pipeline().setStages(Array(si))
> pipeline.fit(flattenedDf).transform(flattenedDf).show() {code}
> The above code fails 
> {code:java}
> xception in thread "main" org.apache.spark.sql.AnalysisException: Cannot 
> resolve column name "location.longitude" among (location.longitude, 
> location.latitude, salary); did you mean to quote the `location.longitude` 
> column?
>     at 
> org.apache.spark.sql.errors.QueryCompilationErrors$.cannotResolveColumnNameAmongFieldsError(QueryCompilationErrors.scala:2261)
>     at 
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$resolveException(Dataset.scala:258)
>     at org.apache.spark.sql.Dataset.$anonfun$resolve$1(Dataset.scala:250)
> . {code}
> This points to the same failure as when we try to select dot notation columns 
> in a spark dataframe, which is solved using BACKTICKS *`column.name`.* 
> [https://stackoverflow.com/a/51430335/11688337]
>  
> *so next*
> I use the back ticks while defining stringIndexer
> {code:java}
> val si = new 
> StringIndexer().setInputCol("`location.longitude`").setOutputCol("longitutdee")
>  {code}
> In this case *it again fails* (with a diff reason) in the stringIndexer code 
> itself
> {code:java}
> Exception in thread "main" org.apache.spark.SparkException: Input column 
> `location.longitude` does not exist.
>     at 
> org.apache.spark.ml.feature.StringIndexerBase.$anonfun$validateAndTransformSchema$2(StringIndexer.scala:128)
>     at 
> scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:244)
>     at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
>     at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> {code}
>  
> This blocks me to use feature transformation functions on nested columns. 
> Any help in solving this problem will be highly appreciated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-48463) MLLib function unable to handle nested data

2024-06-14 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-48463.

Resolution: Not A Problem

> MLLib function unable to handle nested data
> ---
>
> Key: SPARK-48463
> URL: https://issues.apache.org/jira/browse/SPARK-48463
> Project: Spark
>  Issue Type: Bug
>  Components: ML, MLlib
>Affects Versions: 3.5.1
>Reporter: Chhavi Bansal
>Priority: Major
>  Labels: ML, MLPipelines, mllib, nested
>
> I am trying to use feature transformer on nested data after flattening, but 
> it fails.
>  
> {code:java}
> val structureData = Seq(
>   Row(Row(10, 12), 1000),
>   Row(Row(12, 14), 4300),
>   Row( Row(37, 891), 1400),
>   Row(Row(8902, 12), 4000),
>   Row(Row(12, 89), 1000)
> )
> val structureSchema = new StructType()
>   .add("location", new StructType()
> .add("longitude", IntegerType)
> .add("latitude", IntegerType))
>   .add("salary", IntegerType) 
> val df = spark.createDataFrame(spark.sparkContext.parallelize(structureData), 
> structureSchema) 
> def flattenSchema(schema: StructType, prefix: String = null, prefixSelect: 
> String = null):
> Array[Column] = {
>   schema.fields.flatMap(f => {
> val colName = if (prefix == null) f.name else (prefix + "." + f.name)
> val colnameSelect = if (prefix == null) f.name else (prefixSelect + "." + 
> f.name)
> f.dataType match {
>   case st: StructType => flattenSchema(st, colName, colnameSelect)
>   case _ =>
> Array(col(colName).as(colnameSelect))
> }
>   })
> }
> val flattenColumns = flattenSchema(df.schema)
> val flattenedDf = df.select(flattenColumns: _*){code}
> Now using the string indexer on the DOT notation.
>  
> {code:java}
> val si = new 
> StringIndexer().setInputCol("location.longitude").setOutputCol("longitutdee")
> val pipeline = new Pipeline().setStages(Array(si))
> pipeline.fit(flattenedDf).transform(flattenedDf).show() {code}
> The above code fails 
> {code:java}
> xception in thread "main" org.apache.spark.sql.AnalysisException: Cannot 
> resolve column name "location.longitude" among (location.longitude, 
> location.latitude, salary); did you mean to quote the `location.longitude` 
> column?
>     at 
> org.apache.spark.sql.errors.QueryCompilationErrors$.cannotResolveColumnNameAmongFieldsError(QueryCompilationErrors.scala:2261)
>     at 
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$resolveException(Dataset.scala:258)
>     at org.apache.spark.sql.Dataset.$anonfun$resolve$1(Dataset.scala:250)
> . {code}
> This points to the same failure as when we try to select dot notation columns 
> in a spark dataframe, which is solved using BACKTICKS *`column.name`.* 
> [https://stackoverflow.com/a/51430335/11688337]
>  
> *so next*
> I use the back ticks while defining stringIndexer
> {code:java}
> val si = new 
> StringIndexer().setInputCol("`location.longitude`").setOutputCol("longitutdee")
>  {code}
> In this case *it again fails* (with a diff reason) in the stringIndexer code 
> itself
> {code:java}
> Exception in thread "main" org.apache.spark.SparkException: Input column 
> `location.longitude` does not exist.
>     at 
> org.apache.spark.ml.feature.StringIndexerBase.$anonfun$validateAndTransformSchema$2(StringIndexer.scala:128)
>     at 
> scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:244)
>     at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
>     at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> {code}
>  
> This blocks me to use feature transformation functions on nested columns. 
> Any help in solving this problem will be highly appreciated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-48463) MLLib function unable to handle nested data

2024-06-11 Thread Weichen Xu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-48463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854214#comment-17854214
 ] 

Weichen Xu commented on SPARK-48463:


ah got it. then it is not supported :) 

 

as a workaround, I think you can flatten the original dataframe and rename the 
new column like `location_longitude`(avoid using `.` in column name), then it 
should work.

> MLLib function unable to handle nested data
> ---
>
> Key: SPARK-48463
> URL: https://issues.apache.org/jira/browse/SPARK-48463
> Project: Spark
>  Issue Type: Bug
>  Components: ML, MLlib
>Affects Versions: 3.5.1
>Reporter: Chhavi Bansal
>Priority: Major
>  Labels: ML, MLPipelines, mllib, nested
>
> I am trying to use feature transformer on nested data after flattening, but 
> it fails.
>  
> {code:java}
> val structureData = Seq(
>   Row(Row(10, 12), 1000),
>   Row(Row(12, 14), 4300),
>   Row( Row(37, 891), 1400),
>   Row(Row(8902, 12), 4000),
>   Row(Row(12, 89), 1000)
> )
> val structureSchema = new StructType()
>   .add("location", new StructType()
> .add("longitude", IntegerType)
> .add("latitude", IntegerType))
>   .add("salary", IntegerType) 
> val df = spark.createDataFrame(spark.sparkContext.parallelize(structureData), 
> structureSchema) 
> def flattenSchema(schema: StructType, prefix: String = null, prefixSelect: 
> String = null):
> Array[Column] = {
>   schema.fields.flatMap(f => {
> val colName = if (prefix == null) f.name else (prefix + "." + f.name)
> val colnameSelect = if (prefix == null) f.name else (prefixSelect + "." + 
> f.name)
> f.dataType match {
>   case st: StructType => flattenSchema(st, colName, colnameSelect)
>   case _ =>
> Array(col(colName).as(colnameSelect))
> }
>   })
> }
> val flattenColumns = flattenSchema(df.schema)
> val flattenedDf = df.select(flattenColumns: _*){code}
> Now using the string indexer on the DOT notation.
>  
> {code:java}
> val si = new 
> StringIndexer().setInputCol("location.longitude").setOutputCol("longitutdee")
> val pipeline = new Pipeline().setStages(Array(si))
> pipeline.fit(flattenedDf).transform(flattenedDf).show() {code}
> The above code fails 
> {code:java}
> xception in thread "main" org.apache.spark.sql.AnalysisException: Cannot 
> resolve column name "location.longitude" among (location.longitude, 
> location.latitude, salary); did you mean to quote the `location.longitude` 
> column?
>     at 
> org.apache.spark.sql.errors.QueryCompilationErrors$.cannotResolveColumnNameAmongFieldsError(QueryCompilationErrors.scala:2261)
>     at 
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$resolveException(Dataset.scala:258)
>     at org.apache.spark.sql.Dataset.$anonfun$resolve$1(Dataset.scala:250)
> . {code}
> This points to the same failure as when we try to select dot notation columns 
> in a spark dataframe, which is solved using BACKTICKS *`column.name`.* 
> [https://stackoverflow.com/a/51430335/11688337]
>  
> *so next*
> I use the back ticks while defining stringIndexer
> {code:java}
> val si = new 
> StringIndexer().setInputCol("`location.longitude`").setOutputCol("longitutdee")
>  {code}
> In this case *it again fails* (with a diff reason) in the stringIndexer code 
> itself
> {code:java}
> Exception in thread "main" org.apache.spark.SparkException: Input column 
> `location.longitude` does not exist.
>     at 
> org.apache.spark.ml.feature.StringIndexerBase.$anonfun$validateAndTransformSchema$2(StringIndexer.scala:128)
>     at 
> scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:244)
>     at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
>     at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> {code}
>  
> This blocks me to use feature transformation functions on nested columns. 
> Any help in solving this problem will be highly appreciated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-48463) MLLib function unable to handle nested data

2024-06-11 Thread Weichen Xu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-48463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854053#comment-17854053
 ] 

Weichen Xu commented on SPARK-48463:


I think you don’t need to flatten the original dataframe.

according to StringIndexer code,

```

private def getSelectedCols(dataset: Dataset[_], inputCols: Seq[String]): 
Seq[Column] = {
  inputCols.map { colName =>
    val col = dataset.col(colName)
    ...
  }
}

.setInputCol("location.longitude")

should be able to work on original dataframe with nested column. But if you 
flatten it, the code is broken

 

```

> MLLib function unable to handle nested data
> ---
>
> Key: SPARK-48463
> URL: https://issues.apache.org/jira/browse/SPARK-48463
> Project: Spark
>  Issue Type: Bug
>  Components: ML, MLlib
>Affects Versions: 3.5.1
>Reporter: Chhavi Bansal
>Priority: Major
>  Labels: ML, MLPipelines, mllib, nested
>
> I am trying to use feature transformer on nested data after flattening, but 
> it fails.
>  
> {code:java}
> val structureData = Seq(
>   Row(Row(10, 12), 1000),
>   Row(Row(12, 14), 4300),
>   Row( Row(37, 891), 1400),
>   Row(Row(8902, 12), 4000),
>   Row(Row(12, 89), 1000)
> )
> val structureSchema = new StructType()
>   .add("location", new StructType()
> .add("longitude", IntegerType)
> .add("latitude", IntegerType))
>   .add("salary", IntegerType) 
> val df = spark.createDataFrame(spark.sparkContext.parallelize(structureData), 
> structureSchema) 
> def flattenSchema(schema: StructType, prefix: String = null, prefixSelect: 
> String = null):
> Array[Column] = {
>   schema.fields.flatMap(f => {
> val colName = if (prefix == null) f.name else (prefix + "." + f.name)
> val colnameSelect = if (prefix == null) f.name else (prefixSelect + "." + 
> f.name)
> f.dataType match {
>   case st: StructType => flattenSchema(st, colName, colnameSelect)
>   case _ =>
> Array(col(colName).as(colnameSelect))
> }
>   })
> }
> val flattenColumns = flattenSchema(df.schema)
> val flattenedDf = df.select(flattenColumns: _*){code}
> Now using the string indexer on the DOT notation.
>  
> {code:java}
> val si = new 
> StringIndexer().setInputCol("location.longitude").setOutputCol("longitutdee")
> val pipeline = new Pipeline().setStages(Array(si))
> pipeline.fit(flattenedDf).transform(flattenedDf).show() {code}
> The above code fails 
> {code:java}
> xception in thread "main" org.apache.spark.sql.AnalysisException: Cannot 
> resolve column name "location.longitude" among (location.longitude, 
> location.latitude, salary); did you mean to quote the `location.longitude` 
> column?
>     at 
> org.apache.spark.sql.errors.QueryCompilationErrors$.cannotResolveColumnNameAmongFieldsError(QueryCompilationErrors.scala:2261)
>     at 
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$resolveException(Dataset.scala:258)
>     at org.apache.spark.sql.Dataset.$anonfun$resolve$1(Dataset.scala:250)
> . {code}
> This points to the same failure as when we try to select dot notation columns 
> in a spark dataframe, which is solved using BACKTICKS *`column.name`.* 
> [https://stackoverflow.com/a/51430335/11688337]
>  
> *so next*
> I use the back ticks while defining stringIndexer
> {code:java}
> val si = new 
> StringIndexer().setInputCol("`location.longitude`").setOutputCol("longitutdee")
>  {code}
> In this case *it again fails* (with a diff reason) in the stringIndexer code 
> itself
> {code:java}
> Exception in thread "main" org.apache.spark.SparkException: Input column 
> `location.longitude` does not exist.
>     at 
> org.apache.spark.ml.feature.StringIndexerBase.$anonfun$validateAndTransformSchema$2(StringIndexer.scala:128)
>     at 
> scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:244)
>     at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
>     at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> {code}
>  
> This blocks me to use feature transformation functions on nested columns. 
> Any help in solving this problem will be highly appreciated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-48084) pyspark.ml.connect.evaluation not working in 3.5 client <> 4.0 server

2024-05-06 Thread Weichen Xu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-48084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844112#comment-17844112
 ] 

Weichen Xu commented on SPARK-48084:


This test error {{pyspark.ml.connect.evaluation not working in 3.5 client <> 
4.0 server}} is caused by {{cloudpickle}} forward incompatibility, it is not 
related to ML code

> pyspark.ml.connect.evaluation not working in 3.5 client <> 4.0 server
> -
>
> Key: SPARK-48084
> URL: https://issues.apache.org/jira/browse/SPARK-48084
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML, PySpark
>Affects Versions: 4.0.0
>Reporter: Hyukjin Kwon
>Priority: Major
>
> {code}
> ==
> ERROR [3.966s]: test_regressor_evaluator 
> (pyspark.ml.tests.connect.test_connect_evaluation.EvaluationTestsOnConnect.test_regressor_evaluator)
> --
> Traceback (most recent call last):
>   File 
> "/home/runner/work/spark/spark-3.5/python/pyspark/ml/tests/connect/test_legacy_mode_evaluation.py",
>  line 69, in test_regressor_evaluator
> rmse = rmse_evaluator.evaluate(df1)
>
>   File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/base.py", 
> line 255, in evaluate
> return self._evaluate(dataset)
>^^^
>   File 
> "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/evaluation.py", 
> line 70, in _evaluate
> return aggregate_dataframe(
>
>   File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/util.py", 
> line 93, in aggregate_dataframe
> state = cloudpickle.loads(state)
> 
> AttributeError: Can't get attribute '_class_setstate' on  'pyspark.cloudpickle.cloudpickle' from 
> '/home/runner/work/spark/spark-3.5/python/pyspark/cloudpickle/cloudpickle.py'>
> --
> {code}
> {code}
> ==
> ERROR [4.664s]: test_copy 
> (pyspark.ml.tests.connect.test_connect_tuning.CrossValidatorTestsOnConnect.test_copy)
> --
> Traceback (most recent call last):
>   File 
> "/home/runner/work/spark/spark-3.5/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py",
>  line 115, in test_copy
> cvModel = cv.fit(dataset)
>   ^^^
>   File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/base.py", 
> line 106, in fit
> return self._fit(dataset)
>^^
>   File 
> "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/tuning.py", line 
> 437, in _fit
> for j, metric in pool.imap_unordered(lambda f: f(), tasks):
>   File 
> "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/multiprocessing/pool.py",
>  line 873, in next
> raise value
>   File 
> "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/multiprocessing/pool.py",
>  line 125, in worker
> result = (True, func(*args, **kwds))
> ^^^
>   File 
> "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/tuning.py", line 
> 437, in 
> for j, metric in pool.imap_unordered(lambda f: f(), tasks):
>^^^
>   File 
> "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/tuning.py", line 
> 188, in single_task
> metric = evaluator.evaluate(
>  ^^^
>   File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/base.py", 
> line 255, in evaluate
> return self._evaluate(dataset)
>^^^
>   File 
> "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/evaluation.py", 
> line 70, in _evaluate
> return aggregate_dataframe(
>
>   File "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/util.py", 
> line 93, in aggregate_dataframe
> state = cloudpickle.loads(state)
> 
> AttributeError: Can't get attribute '_class_setstate' on  'pyspark.cloudpickle.cloudpickle' from 
> '/home/runner/work/spark/spark-3.5/python/pyspark/cloudpickle/cloudpickle.py'>
> {code}
> {code}
> ==
> ERROR [3.938s]: test_fit_minimize_metric 
> (pyspark.ml.tests.connect.test_connect_tuning.CrossValidatorTestsOnConnect.test_fit_minimize_metric)
> --
> Traceback (most recent call last):
>   File 
> 

[jira] [Commented] (SPARK-48083) session.copyFromLocalToFs failure with 3.5 client <> 4.0 server

2024-05-06 Thread Weichen Xu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-48083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844111#comment-17844111
 ] 

Weichen Xu commented on SPARK-48083:


this is not an issue,
{{copyFromLocalToFs}} requires spark server to config 
{{spark.connect.copyFromLocalToFs.allowDestLocal}} to False, because the test 
can only use local fs.

> session.copyFromLocalToFs failure with 3.5 client <> 4.0 server
> ---
>
> Key: SPARK-48083
> URL: https://issues.apache.org/jira/browse/SPARK-48083
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 4.0.0
>Reporter: Hyukjin Kwon
>Priority: Major
>  Labels: pull-request-available
>
> {code}
> ==
> ERROR [1.120s]: test_save_load 
> (pyspark.ml.tests.connect.test_connect_classification.ClassificationTestsOnConnect.test_save_load)
> --
> Traceback (most recent call last):
>   File 
> "/home/runner/work/spark/spark-3.5/python/pyspark/ml/tests/connect/test_legacy_mode_classification.py",
>  line 144, in test_save_load
> estimator.save(fs_path)
>   File 
> "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/io_utils.py", 
> line 248, in save
> _copy_dir_from_local_to_fs(tmp_local_dir, path)
>   File 
> "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/io_utils.py", 
> line 57, in _copy_dir_from_local_to_fs
> _copy_file_from_local_to_fs(file_path, dest_file_path)
>   File 
> "/home/runner/work/spark/spark-3.5/python/pyspark/ml/connect/io_utils.py", 
> line 39, in _copy_file_from_local_to_fs
> session.copyFromLocalToFs(local_path, dest_path)
>   File 
> "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/session.py", 
> line 756, in copyFromLocalToFs
> self._client.copy_from_local_to_fs(local_path, dest_path)
>   File 
> "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py",
>  line 1549, in copy_from_local_to_fs
> self._artifact_manager._add_forward_to_fs_artifacts(local_path, dest_path)
>   File 
> "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/artifact.py",
>  line 280, in _add_forward_to_fs_artifacts
> self._request_add_artifacts(requests)
>   File 
> "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/artifact.py",
>  line 259, in _request_add_artifacts
> response: proto.AddArtifactsResponse = self._retrieve_responses(requests)
>^^
>   File 
> "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/artifact.py",
>  line 256, in _retrieve_responses
> return self._stub.AddArtifacts(requests, metadata=self._metadata)
>^^
>   File 
> "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/grpc/_channel.py",
>  line 1536, in __call__
> return _end_unary_response_blocking(state, call, False, None)
>^^
>   File 
> "/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/grpc/_channel.py",
>  line 1006, in _end_unary_response_blocking
> raise _InactiveRpcError(state)  # pytype: disable=not-instantiable
> ^^
> grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated 
> with:
>   status = StatusCode.INTERNAL
>   details = "Uploading artifact file to local file system destination 
> path is not supported."
>   debug_error_string = "UNKNOWN:Error received from peer  
> {grpc_message:"Uploading artifact file to local file system destination path 
> is not supported.", grpc_status:13, 
> created_time:"2024-05-01T03:01:32.[558](https://github.com/HyukjinKwon/spark/actions/runs/8904629949/job/24454181142#step:9:559)489983+00:00"}"
> >
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47663) Add an end to end tests for checking if spark task works well with resources

2024-04-02 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-47663.

Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45794
[https://github.com/apache/spark/pull/45794]

> Add an end to end tests for checking if spark task works well with resources
> 
>
> Key: SPARK-47663
> URL: https://issues.apache.org/jira/browse/SPARK-47663
> Project: Spark
>  Issue Type: Test
>  Components: Tests
>Affects Versions: 4.0.0
>Reporter: Bobby Wang
>Assignee: Bobby Wang
>Priority: Trivial
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> To add an end to end task according to 
> https://github.com/apache/spark/pull/45528#discussion_r1545905575



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-47663) Add an end to end tests for checking if spark task works well with resources

2024-04-02 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-47663:
--

Assignee: Bobby Wang

> Add an end to end tests for checking if spark task works well with resources
> 
>
> Key: SPARK-47663
> URL: https://issues.apache.org/jira/browse/SPARK-47663
> Project: Spark
>  Issue Type: Test
>  Components: Tests
>Affects Versions: 4.0.0
>Reporter: Bobby Wang
>Assignee: Bobby Wang
>Priority: Trivial
>  Labels: pull-request-available
>
> To add an end to end task according to 
> https://github.com/apache/spark/pull/45528#discussion_r1545905575



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-46812) Make `mapInPandas` / mapInArrow` support ResourceProfile (Stage-Level scheduling)

2024-02-18 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-46812.

Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 44852
[https://github.com/apache/spark/pull/44852]

> Make `mapInPandas` / mapInArrow` support ResourceProfile (Stage-Level 
> scheduling)
> -
>
> Key: SPARK-46812
> URL: https://issues.apache.org/jira/browse/SPARK-46812
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark, SQL
>Affects Versions: 4.0.0, 3.5.2
>Reporter: Bobby Wang
>Assignee: Bobby Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Supporting stage-level scheduling for DataFrame API mapInPandas and 
> mapInArrow is quite needed for Spark ML use cases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-46812) Make `mapInPandas` / mapInArrow` support ResourceProfile (Stage-Level scheduling)

2024-02-18 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-46812:
--

Assignee: Bobby Wang

> Make `mapInPandas` / mapInArrow` support ResourceProfile (Stage-Level 
> scheduling)
> -
>
> Key: SPARK-46812
> URL: https://issues.apache.org/jira/browse/SPARK-46812
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark, SQL
>Affects Versions: 4.0.0, 3.5.2
>Reporter: Bobby Wang
>Assignee: Bobby Wang
>Priority: Major
>  Labels: pull-request-available
>
> Supporting stage-level scheduling for DataFrame API mapInPandas and 
> mapInArrow is quite needed for Spark ML use cases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-46361) Add spark dataset chunk read API (python only)

2024-01-05 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-46361.

Resolution: Won't Do

> Add spark dataset chunk read API (python only)
> --
>
> Key: SPARK-46361
> URL: https://issues.apache.org/jira/browse/SPARK-46361
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, Spark Core
>Affects Versions: 4.0.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>  Labels: pull-request-available
>
> *Design doc:*
> h1. 
> [https://docs.google.com/document/d/1LHzwCjm2SluHkta_08cM3jxFSgfF-niaCZbtIThG-H8/edit#heading=h.cxcvohcybvo2]
>  
> *Proposed API:*
> {code:java}
> def persist_dataframe_as_chunks(dataframe: DataFrame) -> list[str]:
> """
> Persist and materialize the spark dataframe as chunks, each chunk is an 
> arrow batch.
> It tries to persist data to spark worker memory firstly, if memory is not 
> sufficient,
> then it fallbacks to persist spilled data to spark worker local disk.
> Return the list of chunk ids.
> This function is only available when it is called from spark driver 
> process.
> """
> def read_chunk(chunk_id):
> """
> Read chunk by id, return this chunk as an arrow table.
> You can call this function from spark driver, spark python UDF python,
> descendant process of spark driver, or descendant process of spark python 
> UDF worker.
>     """
> def unpersist_chunks(chunk_ids: list[str]) -> None:
> """
> Remove chunks by chunk ids.
> This function is only available when it is called from spark driver 
> process.
> """{code}
> *Motivation:*
> (1)
> In Ray on spark, we want to support loading Ray data from arbitrary spark 
> Dataframe with in-memory conversion,
> for Ray on spark, Ray datasource read-task runs as child process of Ray 
> worker node, and in Ray on spark, we launch Ray worker node as child process 
> of pyspark UDF worker.
> So that the above proposed API allows descendent python process of pyspark 
> UDF worker to read a chunk data of given spark dataframe, based on this, we 
> can achieve efficient "spark DataFrame" to "Ray dataset" conversion.
> (2)
> For spark torch distributor, we want to implement an efficient {{Torch 
> DataLoader}} that loads data from spark dataframe without saving spark 
> dataframe to cloud storage. This API makes it feasible. This issue has a 
> similar pattern to use-case (1)
> (3)
> For petastorm spark dataset converter (see 
> [https://www.databricks.com/blog/2020/06/16/simplify-data-conversion-from-apache-spark-to-tensorflow-and-pytorch.html])
>  , using the added API, we can achieve better performance



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-46361) Add spark dataset chunk read API (python only)

2023-12-12 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-46361:
---
Description: 
*Design doc:*
h1. 
[https://docs.google.com/document/d/1LHzwCjm2SluHkta_08cM3jxFSgfF-niaCZbtIThG-H8/edit#heading=h.cxcvohcybvo2]

 

*Proposed API:*
{code:java}
def persist_dataframe_as_chunks(dataframe: DataFrame) -> list[str]:
"""
Persist and materialize the spark dataframe as chunks, each chunk is an 
arrow batch.
It tries to persist data to spark worker memory firstly, if memory is not 
sufficient,
then it fallbacks to persist spilled data to spark worker local disk.
Return the list of chunk ids.
This function is only available when it is called from spark driver process.
"""

def read_chunk(chunk_id):
"""
Read chunk by id, return this chunk as an arrow table.
You can call this function from spark driver, spark python UDF python,
descendant process of spark driver, or descendant process of spark python 
UDF worker.
    """

def unpersist_chunks(chunk_ids: list[str]) -> None:
"""
Remove chunks by chunk ids.
This function is only available when it is called from spark driver process.
"""{code}
*Motivation:*

(1)
In Ray on spark, we want to support loading Ray data from arbitrary spark 
Dataframe with in-memory conversion,

for Ray on spark, Ray datasource read-task runs as child process of Ray worker 
node, and in Ray on spark, we launch Ray worker node as child process of 
pyspark UDF worker.

So that the above proposed API allows descendent python process of pyspark UDF 
worker to read a chunk data of given spark dataframe, based on this, we can 
achieve efficient "spark DataFrame" to "Ray dataset" conversion.

(2)
For spark torch distributor, we want to implement an efficient {{Torch 
DataLoader}} that loads data from spark dataframe without saving spark 
dataframe to cloud storage. This API makes it feasible. This issue has a 
similar pattern to use-case (1)

(3)
For petastorm spark dataset converter (see 
[https://www.databricks.com/blog/2020/06/16/simplify-data-conversion-from-apache-spark-to-tensorflow-and-pytorch.html])
 , using the added API, we can achieve better performance

  was:
*Proposed API:*
{code:java}
def persist_dataframe_as_chunks(dataframe: DataFrame) -> list[str]:
"""
Persist the spark dataframe as chunks, each chunk is an arrow batch.
Return the list of chunk ids.
This function is only available when it is called from spark driver process.
"""

def read_chunk(chunk_id):
"""
Read chunk by id, return arrow batch data of this chunk.
You can call this function from spark driver, spark python UDF python,
descendant process of spark driver, or descendant process of spark python 
UDF worker.
"""

def unpersist_chunks(chunk_ids: list[str]) -> None:
"""
Remove chunks by chunk ids.
This function is only available when it is called from spark driver process.
"""{code}
*Motivation:*

(1)
In Ray on spark, we want to support loading Ray data from arbitrary spark 
Dataframe with in-memory conversion,

for Ray on spark, Ray datasource read-task runs as child process of Ray worker 
node, and in Ray on spark, we launch Ray worker node as child process of 
pyspark UDF worker.

So that the above proposed API allows descendent python process of pyspark UDF 
worker to read a chunk data of given spark dataframe, based on this, we can 
achieve efficient "spark DataFrame" to "Ray dataset" conversion.

(2)
For petastorm spark dataset converter (see 
https://www.databricks.com/blog/2020/06/16/simplify-data-conversion-from-apache-spark-to-tensorflow-and-pytorch.html)
 , using the added API, we can achieve better performance


> Add spark dataset chunk read API (python only)
> --
>
> Key: SPARK-46361
> URL: https://issues.apache.org/jira/browse/SPARK-46361
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, Spark Core
>Affects Versions: 4.0.0
>Reporter: Weichen Xu
>Priority: Major
>  Labels: pull-request-available
>
> *Design doc:*
> h1. 
> [https://docs.google.com/document/d/1LHzwCjm2SluHkta_08cM3jxFSgfF-niaCZbtIThG-H8/edit#heading=h.cxcvohcybvo2]
>  
> *Proposed API:*
> {code:java}
> def persist_dataframe_as_chunks(dataframe: DataFrame) -> list[str]:
> """
> Persist and materialize the spark dataframe as chunks, each chunk is an 
> arrow batch.
> It tries to persist data to spark worker memory firstly, if memory is not 
> sufficient,
> then it fallbacks to persist spilled data to spark worker local disk.
> Return the list of chunk ids.
> This function is only available when it is called from spark driver 
> process.
> """
> def read_chunk(chunk_id):
> """
> Read chunk by id, return this chunk 

[jira] [Assigned] (SPARK-46361) Add spark dataset chunk read API (python only)

2023-12-12 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-46361:
--

Assignee: Weichen Xu

> Add spark dataset chunk read API (python only)
> --
>
> Key: SPARK-46361
> URL: https://issues.apache.org/jira/browse/SPARK-46361
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, Spark Core
>Affects Versions: 4.0.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>  Labels: pull-request-available
>
> *Design doc:*
> h1. 
> [https://docs.google.com/document/d/1LHzwCjm2SluHkta_08cM3jxFSgfF-niaCZbtIThG-H8/edit#heading=h.cxcvohcybvo2]
>  
> *Proposed API:*
> {code:java}
> def persist_dataframe_as_chunks(dataframe: DataFrame) -> list[str]:
> """
> Persist and materialize the spark dataframe as chunks, each chunk is an 
> arrow batch.
> It tries to persist data to spark worker memory firstly, if memory is not 
> sufficient,
> then it fallbacks to persist spilled data to spark worker local disk.
> Return the list of chunk ids.
> This function is only available when it is called from spark driver 
> process.
> """
> def read_chunk(chunk_id):
> """
> Read chunk by id, return this chunk as an arrow table.
> You can call this function from spark driver, spark python UDF python,
> descendant process of spark driver, or descendant process of spark python 
> UDF worker.
>     """
> def unpersist_chunks(chunk_ids: list[str]) -> None:
> """
> Remove chunks by chunk ids.
> This function is only available when it is called from spark driver 
> process.
> """{code}
> *Motivation:*
> (1)
> In Ray on spark, we want to support loading Ray data from arbitrary spark 
> Dataframe with in-memory conversion,
> for Ray on spark, Ray datasource read-task runs as child process of Ray 
> worker node, and in Ray on spark, we launch Ray worker node as child process 
> of pyspark UDF worker.
> So that the above proposed API allows descendent python process of pyspark 
> UDF worker to read a chunk data of given spark dataframe, based on this, we 
> can achieve efficient "spark DataFrame" to "Ray dataset" conversion.
> (2)
> For spark torch distributor, we want to implement an efficient {{Torch 
> DataLoader}} that loads data from spark dataframe without saving spark 
> dataframe to cloud storage. This API makes it feasible. This issue has a 
> similar pattern to use-case (1)
> (3)
> For petastorm spark dataset converter (see 
> [https://www.databricks.com/blog/2020/06/16/simplify-data-conversion-from-apache-spark-to-tensorflow-and-pytorch.html])
>  , using the added API, we can achieve better performance



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-46361) Add spark dataset chunk read API (python only)

2023-12-11 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-46361:
---
Description: 
*Proposed API:*
{code:java}
def persist_dataframe_as_chunks(dataframe: DataFrame) -> list[str]:
"""
Persist the spark dataframe as chunks, each chunk is an arrow batch.
Return the list of chunk ids.
This function is only available when it is called from spark driver process.
"""

def read_chunk(chunk_id):
"""
Read chunk by id, return arrow batch data of this chunk.
You can call this function from spark driver, spark python UDF python,
descendant process of spark driver, or descendant process of spark python 
UDF worker.
"""

def unpersist_chunks(chunk_ids: list[str]) -> None:
"""
Remove chunks by chunk ids.
This function is only available when it is called from spark driver process.
"""{code}
*Motivation:*

(1)
In Ray on spark, we want to support loading Ray data from arbitrary spark 
Dataframe with in-memory conversion,

for Ray on spark, Ray datasource read-task runs as child process of Ray worker 
node, and in Ray on spark, we launch Ray worker node as child process of 
pyspark UDF worker.

So that the above proposed API allows descendent python process of pyspark UDF 
worker to read a chunk data of given spark dataframe, based on this, we can 
achieve efficient "spark DataFrame" to "Ray dataset" conversion.

(2)
For petastorm spark dataset converter (see 
https://www.databricks.com/blog/2020/06/16/simplify-data-conversion-from-apache-spark-to-tensorflow-and-pytorch.html)
 , using the added API, we can achieve better performance

  was:
*Proposed API:*
{code:java}
def persist_dataframe_as_chunks(dataframe: DataFrame) -> list[str]:
"""
Persist the spark dataframe as chunks, each chunk is an arrow batch.
Return the list of chunk ids.
This function is only available when it is called from spark driver process.
"""

def read_chunk(chunk_id):
"""
Read chunk by id, return arrow batch data of this chunk.
You can call this function from spark driver, spark python UDF python,
descendant process of spark driver, or descendant process of spark python 
UDF worker.
"""

def unpersist_chunks(chunk_ids: list[str]) -> None:
"""
Remove chunks by chunk ids.
This function is only available when it is called from spark driver process.
"""{code}
*Motivation:*

In Ray on spark, we want to support loading Ray data from arbitrary spark 
Dataframe with in-memory conversion,

for Ray on spark, Ray datasource read-task runs as child process of Ray worker 
node, and in Ray on spark, we launch Ray worker node as child process of 
pyspark UDF worker.

So that the above proposed API allows descendent python process of pyspark UDF 
worker to read a chunk data of given spark dataframe.


> Add spark dataset chunk read API (python only)
> --
>
> Key: SPARK-46361
> URL: https://issues.apache.org/jira/browse/SPARK-46361
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, Spark Core
>Affects Versions: 4.0.0
>Reporter: Weichen Xu
>Priority: Major
>  Labels: pull-request-available
>
> *Proposed API:*
> {code:java}
> def persist_dataframe_as_chunks(dataframe: DataFrame) -> list[str]:
> """
> Persist the spark dataframe as chunks, each chunk is an arrow batch.
> Return the list of chunk ids.
> This function is only available when it is called from spark driver 
> process.
> """
> def read_chunk(chunk_id):
> """
> Read chunk by id, return arrow batch data of this chunk.
> You can call this function from spark driver, spark python UDF python,
> descendant process of spark driver, or descendant process of spark python 
> UDF worker.
> """
> def unpersist_chunks(chunk_ids: list[str]) -> None:
> """
> Remove chunks by chunk ids.
> This function is only available when it is called from spark driver 
> process.
> """{code}
> *Motivation:*
> (1)
> In Ray on spark, we want to support loading Ray data from arbitrary spark 
> Dataframe with in-memory conversion,
> for Ray on spark, Ray datasource read-task runs as child process of Ray 
> worker node, and in Ray on spark, we launch Ray worker node as child process 
> of pyspark UDF worker.
> So that the above proposed API allows descendent python process of pyspark 
> UDF worker to read a chunk data of given spark dataframe, based on this, we 
> can achieve efficient "spark DataFrame" to "Ray dataset" conversion.
> (2)
> For petastorm spark dataset converter (see 
> https://www.databricks.com/blog/2020/06/16/simplify-data-conversion-from-apache-spark-to-tensorflow-and-pytorch.html)
>  , using the added API, we can achieve better performance



--
This message was sent by 

[jira] [Created] (SPARK-46361) Add spark dataset chunk read API (python only)

2023-12-11 Thread Weichen Xu (Jira)
Weichen Xu created SPARK-46361:
--

 Summary: Add spark dataset chunk read API (python only)
 Key: SPARK-46361
 URL: https://issues.apache.org/jira/browse/SPARK-46361
 Project: Spark
  Issue Type: Improvement
  Components: PySpark, Spark Core
Affects Versions: 4.0.0
Reporter: Weichen Xu


*Proposed API:*
{code:java}
def persist_dataframe_as_chunks(dataframe: DataFrame) -> list[str]:
"""
Persist the spark dataframe as chunks, each chunk is an arrow batch.
Return the list of chunk ids.
This function is only available when it is called from spark driver process.
"""

def read_chunk(chunk_id):
"""
Read chunk by id, return arrow batch data of this chunk.
You can call this function from spark driver, spark python UDF python,
descendant process of spark driver, or descendant process of spark python 
UDF worker.
"""

def unpersist_chunks(chunk_ids: list[str]) -> None:
"""
Remove chunks by chunk ids.
This function is only available when it is called from spark driver process.
"""{code}
*Motivation:*

In Ray on spark, we want to support loading Ray data from arbitrary spark 
Dataframe with in-memory conversion,

for Ray on spark, Ray datasource read-task runs as child process of Ray worker 
node, and in Ray on spark, we launch Ray worker node as child process of 
pyspark UDF worker.

So that the above proposed API allows descendent python process of pyspark UDF 
worker to read a chunk data of given spark dataframe.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-45397) Add vector assembler feature transformer

2023-10-11 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-45397.

Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 43199
[https://github.com/apache/spark/pull/43199]

> Add vector assembler feature transformer
> 
>
> Key: SPARK-45397
> URL: https://issues.apache.org/jira/browse/SPARK-45397
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.1
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Add vector assembler feature transformer



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-45397) Add vector assembler feature transformer

2023-10-11 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-45397:
--

Assignee: Weichen Xu

> Add vector assembler feature transformer
> 
>
> Key: SPARK-45397
> URL: https://issues.apache.org/jira/browse/SPARK-45397
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.1
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>  Labels: pull-request-available
>
> Add vector assembler feature transformer



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45397) Add vector assembler feature transformer

2023-10-03 Thread Weichen Xu (Jira)
Weichen Xu created SPARK-45397:
--

 Summary: Add vector assembler feature transformer
 Key: SPARK-45397
 URL: https://issues.apache.org/jira/browse/SPARK-45397
 Project: Spark
  Issue Type: Sub-task
  Components: Connect, ML, PySpark
Affects Versions: 3.5.1
Reporter: Weichen Xu


Add vector assembler feature transformer



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45396) Add doc entry for `pyspark.ml.connect` module

2023-10-03 Thread Weichen Xu (Jira)
Weichen Xu created SPARK-45396:
--

 Summary: Add doc entry for `pyspark.ml.connect` module
 Key: SPARK-45396
 URL: https://issues.apache.org/jira/browse/SPARK-45396
 Project: Spark
  Issue Type: Sub-task
  Components: Connect, ML, PySpark
Affects Versions: 3.5.0
Reporter: Weichen Xu


Add doc entry for `pyspark.ml.connect` module



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45130) Avoid Spark connect ML model to change input pandas dataframe

2023-09-12 Thread Weichen Xu (Jira)
Weichen Xu created SPARK-45130:
--

 Summary: Avoid Spark connect ML model to change input pandas 
dataframe
 Key: SPARK-45130
 URL: https://issues.apache.org/jira/browse/SPARK-45130
 Project: Spark
  Issue Type: Sub-task
  Components: Connect, ML, PySpark
Affects Versions: 4.0.0
Reporter: Weichen Xu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45130) Avoid Spark connect ML model to change input pandas dataframe

2023-09-12 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-45130:
---
Description: Currently, 

> Avoid Spark connect ML model to change input pandas dataframe
> -
>
> Key: SPARK-45130
> URL: https://issues.apache.org/jira/browse/SPARK-45130
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 4.0.0
>Reporter: Weichen Xu
>Priority: Major
>
> Currently, 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45129) Add pyspark "ml-connect" extras dependencies

2023-09-12 Thread Weichen Xu (Jira)
Weichen Xu created SPARK-45129:
--

 Summary: Add pyspark "ml-connect" extras dependencies
 Key: SPARK-45129
 URL: https://issues.apache.org/jira/browse/SPARK-45129
 Project: Spark
  Issue Type: Sub-task
  Components: Connect, ML, PySpark
Affects Versions: 4.0.0
Reporter: Weichen Xu


Add pyspark "ml-connect" extras dependencies

dependencies includes:

```

torch # used in model distributed training

torcheval # used in metric evaluation

scikit-learn # used in exporting feature transformer models.

```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-44908) Fix spark connect ML crossvalidator "foldCol" param

2023-08-23 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-44908.

Fix Version/s: 3.5.0
   4.0.0
   Resolution: Fixed

Issue resolved by pull request 42605
[https://github.com/apache/spark/pull/42605]

> Fix spark connect ML crossvalidator "foldCol" param
> ---
>
> Key: SPARK-44908
> URL: https://issues.apache.org/jira/browse/SPARK-44908
> Project: Spark
>  Issue Type: Bug
>  Components: Connect, ML
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Minor
> Fix For: 3.5.0, 4.0.0
>
>
> Fix spark connect ML crossvalidator "foldCol" param.
>  
> Currently it calls `df.rdd` APIs but it is not supported in spark connect



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-44909) Skip starting torch distributor log streaming server when it is not available

2023-08-23 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-44909.

Fix Version/s: 3.5.0
   4.0.0
   Resolution: Fixed

Issue resolved by pull request 42606
[https://github.com/apache/spark/pull/42606]

> Skip starting torch distributor log streaming server when it is not available
> -
>
> Key: SPARK-44909
> URL: https://issues.apache.org/jira/browse/SPARK-44909
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 0.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
> Fix For: 3.5.0, 4.0.0
>
>
> Skip starting torch distributor log streaming server when it is not available.
>  
> In some cases, e.g., in a databricks connect cluster, there is some network 
> limitation that casues starting log streaming server failure, but, this does 
> not need to break torch distributor training routine.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-44909) Skip starting torch distributor log streaming server when it is not available

2023-08-23 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-44909:
--

Assignee: Weichen Xu

> Skip starting torch distributor log streaming server when it is not available
> -
>
> Key: SPARK-44909
> URL: https://issues.apache.org/jira/browse/SPARK-44909
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 0.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>
> Skip starting torch distributor log streaming server when it is not available.
>  
> In some cases, e.g., in a databricks connect cluster, there is some network 
> limitation that casues starting log streaming server failure, but, this does 
> not need to break torch distributor training routine.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44909) Skip starting torch distributor log streaming server when it is not available

2023-08-22 Thread Weichen Xu (Jira)
Weichen Xu created SPARK-44909:
--

 Summary: Skip starting torch distributor log streaming server when 
it is not available
 Key: SPARK-44909
 URL: https://issues.apache.org/jira/browse/SPARK-44909
 Project: Spark
  Issue Type: Improvement
  Components: ML
Affects Versions: 3.5
Reporter: Weichen Xu


Skip starting torch distributor log streaming server when it is not available.

 

In some cases, e.g., in a databricks connect cluster, there is some network 
limitation that casues starting log streaming server failure, but, this does 
not need to break torch distributor training routine.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44908) Fix spark connect ML crossvalidator "foldCol" param

2023-08-22 Thread Weichen Xu (Jira)
Weichen Xu created SPARK-44908:
--

 Summary: Fix spark connect ML crossvalidator "foldCol" param
 Key: SPARK-44908
 URL: https://issues.apache.org/jira/browse/SPARK-44908
 Project: Spark
  Issue Type: Bug
  Components: Connect, ML
Affects Versions: 3.5.0
Reporter: Weichen Xu


Fix spark connect ML crossvalidator "foldCol" param.

 

Currently it calls `df.rdd` APIs but it is not supported in spark connect



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-44908) Fix spark connect ML crossvalidator "foldCol" param

2023-08-22 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-44908:
--

Assignee: Weichen Xu

> Fix spark connect ML crossvalidator "foldCol" param
> ---
>
> Key: SPARK-44908
> URL: https://issues.apache.org/jira/browse/SPARK-44908
> Project: Spark
>  Issue Type: Bug
>  Components: Connect, ML
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Minor
>
> Fix spark connect ML crossvalidator "foldCol" param.
>  
> Currently it calls `df.rdd` APIs but it is not supported in spark connect



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44374) Add example code

2023-07-11 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-44374:
---
Fix Version/s: 3.5.0

> Add example code
> 
>
> Key: SPARK-44374
> URL: https://issues.apache.org/jira/browse/SPARK-44374
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
> Fix For: 3.5.0
>
>
> Add example code for distributed ML <> spark connect .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-44374) Add example code

2023-07-11 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-44374.

Resolution: Done

> Add example code
> 
>
> Key: SPARK-44374
> URL: https://issues.apache.org/jira/browse/SPARK-44374
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>
> Add example code for distributed ML <> spark connect .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44374) Add example code

2023-07-11 Thread Weichen Xu (Jira)
Weichen Xu created SPARK-44374:
--

 Summary: Add example code
 Key: SPARK-44374
 URL: https://issues.apache.org/jira/browse/SPARK-44374
 Project: Spark
  Issue Type: Sub-task
  Components: Connect, ML, PySpark
Affects Versions: 3.5.0
Reporter: Weichen Xu


Add example code for distributed ML <> spark connect .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-44374) Add example code

2023-07-11 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-44374:
--

Assignee: Weichen Xu

> Add example code
> 
>
> Key: SPARK-44374
> URL: https://issues.apache.org/jira/browse/SPARK-44374
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>
> Add example code for distributed ML <> spark connect .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-42471) Distributed ML <> spark connect

2023-07-11 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-42471:
--

Assignee: Weichen Xu

> Distributed ML <> spark connect
> ---
>
> Key: SPARK-42471
> URL: https://issues.apache.org/jira/browse/SPARK-42471
> Project: Spark
>  Issue Type: Umbrella
>  Components: Connect, ML
>Affects Versions: 3.4.0
>Reporter: Ruifeng Zheng
>Assignee: Weichen Xu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-43983) Implement cross validator estimator

2023-07-10 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-43983.

Fix Version/s: 3.5.0
   Resolution: Fixed

Issue resolved by pull request 41881
[https://github.com/apache/spark/pull/41881]

> Implement cross validator estimator
> ---
>
> Key: SPARK-43983
> URL: https://issues.apache.org/jira/browse/SPARK-43983
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-44250) Implement classification evaluator

2023-07-04 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-44250.

Resolution: Done

> Implement classification evaluator
> --
>
> Key: SPARK-44250
> URL: https://issues.apache.org/jira/browse/SPARK-44250
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>
> Implement classification evaluator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44250) Implement classification evaluator

2023-06-29 Thread Weichen Xu (Jira)
Weichen Xu created SPARK-44250:
--

 Summary: Implement classification evaluator
 Key: SPARK-44250
 URL: https://issues.apache.org/jira/browse/SPARK-44250
 Project: Spark
  Issue Type: Sub-task
  Components: Connect, ML
Affects Versions: 3.5.0
Reporter: Weichen Xu


Implement classification evaluator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-44250) Implement classification evaluator

2023-06-29 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-44250:
--

Assignee: Weichen Xu

> Implement classification evaluator
> --
>
> Key: SPARK-44250
> URL: https://issues.apache.org/jira/browse/SPARK-44250
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>
> Implement classification evaluator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-44100) Move namespace from `pyspark.mlv2` to `pyspark.ml.connect`

2023-06-20 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-44100.

Fix Version/s: 3.5.0
   Resolution: Fixed

Issue resolved by pull request 41666
[https://github.com/apache/spark/pull/41666]

> Move namespace from `pyspark.mlv2` to `pyspark.ml.connect`
> --
>
> Key: SPARK-44100
> URL: https://issues.apache.org/jira/browse/SPARK-44100
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
> Fix For: 3.5.0
>
>
> For distributed ML <> spark connect project code,
> move namespace from `pyspark.mlv2` to `pyspark.ml.connect`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-44100) Move namespace from `pyspark.mlv2` to `pyspark.ml.connect`

2023-06-19 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-44100:
--

Assignee: Weichen Xu

> Move namespace from `pyspark.mlv2` to `pyspark.ml.connect`
> --
>
> Key: SPARK-44100
> URL: https://issues.apache.org/jira/browse/SPARK-44100
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>
> For distributed ML <> spark connect project code,
> move namespace from `pyspark.mlv2` to `pyspark.ml.connect`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44100) Move namespace from `pyspark.mlv2` to `pyspark.ml.connect`

2023-06-19 Thread Weichen Xu (Jira)
Weichen Xu created SPARK-44100:
--

 Summary: Move namespace from `pyspark.mlv2` to `pyspark.ml.connect`
 Key: SPARK-44100
 URL: https://issues.apache.org/jira/browse/SPARK-44100
 Project: Spark
  Issue Type: Sub-task
  Components: Connect, ML, PySpark
Affects Versions: 3.5.0
Reporter: Weichen Xu


For distributed ML <> spark connect project code,

move namespace from `pyspark.mlv2` to `pyspark.ml.connect`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-42501) High level design doc for Distributed ML <> spark connect

2023-06-19 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-42501:
---
Description: 
Design doc:

https://docs.google.com/document/d/1LHzwCjm2SluHkta_08cM3jxFSgfF-niaCZbtIThG-H8/edit?usp=sharing

> High level design doc for Distributed ML <> spark connect
> -
>
> Key: SPARK-42501
> URL: https://issues.apache.org/jira/browse/SPARK-42501
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, Documentation, ML
>Affects Versions: 3.4.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>
> Design doc:
> https://docs.google.com/document/d/1LHzwCjm2SluHkta_08cM3jxFSgfF-niaCZbtIThG-H8/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-42501) High level design doc for Distributed ML <> spark connect

2023-06-19 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-42501.

Resolution: Done

> High level design doc for Distributed ML <> spark connect
> -
>
> Key: SPARK-42501
> URL: https://issues.apache.org/jira/browse/SPARK-42501
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, Documentation, ML
>Affects Versions: 3.4.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>
> Design doc:
> https://docs.google.com/document/d/1LHzwCjm2SluHkta_08cM3jxFSgfF-niaCZbtIThG-H8/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-42412) Initial prototype implementation for PySparkML

2023-06-19 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-42412.

Resolution: Done

> Initial prototype implementation for PySparkML
> --
>
> Key: SPARK-42412
> URL: https://issues.apache.org/jira/browse/SPARK-42412
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-43982) Implement pipeline estimator

2023-06-19 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-43982.

Fix Version/s: 3.5.0
   Resolution: Fixed

Issue resolved by pull request 41479
[https://github.com/apache/spark/pull/41479]

> Implement pipeline estimator
> 
>
> Key: SPARK-43982
> URL: https://issues.apache.org/jira/browse/SPARK-43982
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-43981) Basic saving / loading implementation

2023-06-13 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-43981.

Fix Version/s: 3.5.0
   Resolution: Fixed

Issue resolved by pull request 41478
[https://github.com/apache/spark/pull/41478]

> Basic saving / loading implementation
> -
>
> Key: SPARK-43981
> URL: https://issues.apache.org/jira/browse/SPARK-43981
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
> Fix For: 3.5.0
>
>
> Support saving/loading  for estimator / transformer / evaluator / model.
> We have some design goals:
>  * The model format is decoupled from spark, i.e. we can run model inference 
> without spark service.
>  * We can save model to either local file system or cloud storage file system.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-43790) Add API `copyLocalFileToHadoopFS`

2023-06-07 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-43790.

Fix Version/s: 3.5.0
   Resolution: Fixed

Issue resolved by pull request 41357
[https://github.com/apache/spark/pull/41357]

> Add API `copyLocalFileToHadoopFS`
> -
>
> Key: SPARK-43790
> URL: https://issues.apache.org/jira/browse/SPARK-43790
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
> Fix For: 3.5.0
>
>
> In new distributed spark ML module (designed to support spark connect and 
> support local inference)
> We need to save ML model to hadoop file system using custom binary file 
> format, the reason is:
>  * We often submit a spark application to spark cluster for running the 
> training model job, we need to save trained model to hadoop file system 
> before the spark application completes.
>  * But we want to support local model inference, that means if we save the 
> model by current spark DataFrame writer (e.g. parquet format), when loading 
> model we have to rely on the spark service. But we hope we can load model 
> without spark service. So we want the model being saved as the original 
> binary format that our ML code can handle.
>  
> So we need to add an API like `copyLocalFileToHadoopFS`,
> The implementation of `copyLocalFileToHadoopFS` could be:
>  
> (1) call `add_artifact` API to upload local file to spark driver (spark 
> connect already support this)
> (2) implement a pyspark (spark connect client) API: 
> `copy_artifact_to_hadoop_fs`, the API sends a command to spark driver to 
> request upload the artifact file to hadoop FS, *we need to design a spark 
> connect protobuf command message for this part.* In spark driver side, when 
> spark connect server received the request, it gets `sparkContext.hadoopConf` 
> and then using Hadoop FileSystem API to upload file to Hadoop FS.
> (3) call `copy_artifact_to_hadoop_fs` API to upload artifact file to Hadoop 
> FS.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-43097) Implement pyspark ML logistic regression estimator on top of torch distributor

2023-06-06 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-43097.

Fix Version/s: 3.5.0
   Resolution: Fixed

Issue resolved by pull request 41383
[https://github.com/apache/spark/pull/41383]

> Implement pyspark ML logistic regression estimator on top of torch distributor
> --
>
> Key: SPARK-43097
> URL: https://issues.apache.org/jira/browse/SPARK-43097
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-43982) Implement pipeline estimator

2023-06-06 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-43982:
--

Assignee: Weichen Xu

> Implement pipeline estimator
> 
>
> Key: SPARK-43982
> URL: https://issues.apache.org/jira/browse/SPARK-43982
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-43982) Implement pipeline estimator

2023-06-06 Thread Weichen Xu (Jira)
Weichen Xu created SPARK-43982:
--

 Summary: Implement pipeline estimator
 Key: SPARK-43982
 URL: https://issues.apache.org/jira/browse/SPARK-43982
 Project: Spark
  Issue Type: Sub-task
  Components: Connect, ML, PySpark
Affects Versions: 3.5.0
Reporter: Weichen Xu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-43983) Implement cross validator estimator

2023-06-06 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-43983:
--

Assignee: Weichen Xu

> Implement cross validator estimator
> ---
>
> Key: SPARK-43983
> URL: https://issues.apache.org/jira/browse/SPARK-43983
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-43983) Implement cross validator estimator

2023-06-06 Thread Weichen Xu (Jira)
Weichen Xu created SPARK-43983:
--

 Summary: Implement cross validator estimator
 Key: SPARK-43983
 URL: https://issues.apache.org/jira/browse/SPARK-43983
 Project: Spark
  Issue Type: Sub-task
  Components: Connect, ML, PySpark
Affects Versions: 3.5.0
Reporter: Weichen Xu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43981) Basic saving / loading implementation

2023-06-06 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-43981:
---
Description: 
Support saving/loading  for estimator / transformer / evaluator / model.

We have some design goals:
 * The model format is decoupled from spark, i.e. we can run model inference 
without spark service.
 * We can save model to either local file system or cloud storage file system.

> Basic saving / loading implementation
> -
>
> Key: SPARK-43981
> URL: https://issues.apache.org/jira/browse/SPARK-43981
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>
> Support saving/loading  for estimator / transformer / evaluator / model.
> We have some design goals:
>  * The model format is decoupled from spark, i.e. we can run model inference 
> without spark service.
>  * We can save model to either local file system or cloud storage file system.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-43981) Basic saving / loading implementation

2023-06-06 Thread Weichen Xu (Jira)
Weichen Xu created SPARK-43981:
--

 Summary: Basic saving / loading implementation
 Key: SPARK-43981
 URL: https://issues.apache.org/jira/browse/SPARK-43981
 Project: Spark
  Issue Type: Sub-task
  Components: PySpark
Affects Versions: 3.5.0
Reporter: Weichen Xu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-43981) Basic saving / loading implementation

2023-06-06 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-43981:
--

Assignee: Weichen Xu

> Basic saving / loading implementation
> -
>
> Key: SPARK-43981
> URL: https://issues.apache.org/jira/browse/SPARK-43981
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43981) Basic saving / loading implementation

2023-06-06 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-43981:
---
Component/s: Connect
 ML

> Basic saving / loading implementation
> -
>
> Key: SPARK-43981
> URL: https://issues.apache.org/jira/browse/SPARK-43981
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-43715) Add spark DataFrame binary file format writer

2023-06-06 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-43715.

Resolution: Won't Do

> Add spark DataFrame binary file format writer
> -
>
> Key: SPARK-43715
> URL: https://issues.apache.org/jira/browse/SPARK-43715
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>
> In new distributed spark ML module (designed to support spark connect and 
> support local inference)
> We need to save ML model to hadoop file system using custom binary file 
> format, the reason is:
>  * We often submit a spark application to spark cluster for running the 
> training model job, we need to save trained model to hadoop file system 
> before the spark application completes.
>  * But we want to support local model inference, that means if we save the 
> model by current spark DataFrame writer (e.g. parquet format), when loading 
> model we have to rely on the spark service. But we hope we can load model 
> without spark service. So we want the model being saved as the original 
> binary format that our ML code can handle.
> We already have reader API of "binaryFile" format, we need to add a writer 
> API:
> {*}Writer API{*}:
> Supposing we have a dataframe with schema:
> [file_path: String, content: binary],
> we can save the dataframe to a hadoop path, each row we will save it as a 
> file under the hadoop path, the saved file path is \{hadoop 
> path}/\{file_path}, "file_path" can be a multiple part path.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-43788) Enable SummarizerTests.test_summarize_dataframe for pandas 2.0.0.

2023-06-05 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-43788:
--

Assignee: Weichen Xu

> Enable SummarizerTests.test_summarize_dataframe for pandas 2.0.0.
> -
>
> Key: SPARK-43788
> URL: https://issues.apache.org/jira/browse/SPARK-43788
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML
>Affects Versions: 3.5.0
>Reporter: Haejoon Lee
>Assignee: Weichen Xu
>Priority: Major
>
> Enable SummarizerTests.test_summarize_dataframe for pandas 2.0.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-43788) Enable SummarizerTests.test_summarize_dataframe for pandas 2.0.0.

2023-06-05 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-43788.

Fix Version/s: 3.5.0
   Resolution: Fixed

Issue resolved by pull request 41456
[https://github.com/apache/spark/pull/41456]

> Enable SummarizerTests.test_summarize_dataframe for pandas 2.0.0.
> -
>
> Key: SPARK-43788
> URL: https://issues.apache.org/jira/browse/SPARK-43788
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML
>Affects Versions: 3.5.0
>Reporter: Haejoon Lee
>Assignee: Weichen Xu
>Priority: Major
> Fix For: 3.5.0
>
>
> Enable SummarizerTests.test_summarize_dataframe for pandas 2.0.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-43784) Enable FeatureTests.test_max_abs_scaler for pandas 2.0.0.

2023-06-05 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-43784.

Fix Version/s: 3.5.0
   Resolution: Fixed

Issue resolved by pull request 41456
[https://github.com/apache/spark/pull/41456]

> Enable FeatureTests.test_max_abs_scaler for pandas 2.0.0.
> -
>
> Key: SPARK-43784
> URL: https://issues.apache.org/jira/browse/SPARK-43784
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML
>Affects Versions: 3.5.0
>Reporter: Haejoon Lee
>Assignee: Weichen Xu
>Priority: Major
> Fix For: 3.5.0
>
>
> Enable FeatureTests.test_max_abs_scaler for pandas 2.0.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-43784) Enable FeatureTests.test_max_abs_scaler for pandas 2.0.0.

2023-06-05 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-43784:
--

Assignee: Weichen Xu

> Enable FeatureTests.test_max_abs_scaler for pandas 2.0.0.
> -
>
> Key: SPARK-43784
> URL: https://issues.apache.org/jira/browse/SPARK-43784
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML
>Affects Versions: 3.5.0
>Reporter: Haejoon Lee
>Assignee: Weichen Xu
>Priority: Major
>
> Enable FeatureTests.test_max_abs_scaler for pandas 2.0.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-43783) Enable FeatureTests.test_standard_scaler for pandas 2.0.0.

2023-06-05 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-43783.

Fix Version/s: 3.5.0
   Resolution: Fixed

Issue resolved by pull request 41456
[https://github.com/apache/spark/pull/41456]

> Enable FeatureTests.test_standard_scaler for pandas 2.0.0.
> --
>
> Key: SPARK-43783
> URL: https://issues.apache.org/jira/browse/SPARK-43783
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML
>Affects Versions: 3.5.0
>Reporter: Haejoon Lee
>Assignee: Weichen Xu
>Priority: Major
> Fix For: 3.5.0
>
>
> Fix `FeatureTests.test_standard_scaler` In 
> `python/pyspark/mlv2/tests/test_feature.py`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-43783) Enable FeatureTests.test_standard_scaler for pandas 2.0.0.

2023-06-05 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-43783:
--

Assignee: Weichen Xu

> Enable FeatureTests.test_standard_scaler for pandas 2.0.0.
> --
>
> Key: SPARK-43783
> URL: https://issues.apache.org/jira/browse/SPARK-43783
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML
>Affects Versions: 3.5.0
>Reporter: Haejoon Lee
>Assignee: Weichen Xu
>Priority: Major
>
> Fix `FeatureTests.test_standard_scaler` In 
> `python/pyspark/mlv2/tests/test_feature.py`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43790) Add API `copyLocalFileToHadoopFS`

2023-05-24 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-43790:
---
Description: 
In new distributed spark ML module (designed to support spark connect and 
support local inference)

We need to save ML model to hadoop file system using custom binary file format, 
the reason is:
 * We often submit a spark application to spark cluster for running the 
training model job, we need to save trained model to hadoop file system before 
the spark application completes.
 * But we want to support local model inference, that means if we save the 
model by current spark DataFrame writer (e.g. parquet format), when loading 
model we have to rely on the spark service. But we hope we can load model 
without spark service. So we want the model being saved as the original binary 
format that our ML code can handle.

 

So we need to add an API like `copyLocalFileToHadoopFS`,

The implementation of `copyLocalFileToHadoopFS` could be:

 

(1) call `add_artifact` API to upload local file to spark driver (spark connect 
already support this)

(2) implement a pyspark (spark connect client) API: 
`copy_artifact_to_hadoop_fs`, the API sends a command to spark driver to 
request upload the artifact file to hadoop FS, *we need to design a spark 
connect protobuf command message for this part.* In spark driver side, when 
spark connect server received the request, it gets `sparkContext.hadoopConf` 
and then using Hadoop FileSystem API to upload file to Hadoop FS.

(3) call `copy_artifact_to_hadoop_fs` API to upload artifact file to Hadoop FS.

 

 

  was:
In new distributed spark ML module (designed to support spark connect and 
support local inference)

We need to save ML model to hadoop file system using custom binary file format, 
the reason is:
 * We often submit a spark application to spark cluster for running the 
training model job, we need to save trained model to hadoop file system before 
the spark application completes.
 * But we want to support local model inference, that means if we save the 
model by current spark DataFrame writer (e.g. parquet format), when loading 
model we have to rely on the spark service. But we hope we can load model 
without spark service. So we want the model being saved as the original binary 
format that our ML code can handle.

 

So we need to add an API like `copyLocalFileToHadoopFS`,

The implementation could be:

 

(1) call `add_artifact` API to upload local file to spark driver (spark connect 
already support this)

(2) In spark driver side, we can get `sparkContext.hadoopConf` and then using 
Hadoop FileSystem API to upload file to Hadoop FS.

 


> Add API `copyLocalFileToHadoopFS`
> -
>
> Key: SPARK-43790
> URL: https://issues.apache.org/jira/browse/SPARK-43790
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>
> In new distributed spark ML module (designed to support spark connect and 
> support local inference)
> We need to save ML model to hadoop file system using custom binary file 
> format, the reason is:
>  * We often submit a spark application to spark cluster for running the 
> training model job, we need to save trained model to hadoop file system 
> before the spark application completes.
>  * But we want to support local model inference, that means if we save the 
> model by current spark DataFrame writer (e.g. parquet format), when loading 
> model we have to rely on the spark service. But we hope we can load model 
> without spark service. So we want the model being saved as the original 
> binary format that our ML code can handle.
>  
> So we need to add an API like `copyLocalFileToHadoopFS`,
> The implementation of `copyLocalFileToHadoopFS` could be:
>  
> (1) call `add_artifact` API to upload local file to spark driver (spark 
> connect already support this)
> (2) implement a pyspark (spark connect client) API: 
> `copy_artifact_to_hadoop_fs`, the API sends a command to spark driver to 
> request upload the artifact file to hadoop FS, *we need to design a spark 
> connect protobuf command message for this part.* In spark driver side, when 
> spark connect server received the request, it gets `sparkContext.hadoopConf` 
> and then using Hadoop FileSystem API to upload file to Hadoop FS.
> (3) call `copy_artifact_to_hadoop_fs` API to upload artifact file to Hadoop 
> FS.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-43790) Add API `copyLocalFileToHadoopFS`

2023-05-24 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-43790:
--

Assignee: Weichen Xu

> Add API `copyLocalFileToHadoopFS`
> -
>
> Key: SPARK-43790
> URL: https://issues.apache.org/jira/browse/SPARK-43790
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>
> In new distributed spark ML module (designed to support spark connect and 
> support local inference)
> We need to save ML model to hadoop file system using custom binary file 
> format, the reason is:
>  * We often submit a spark application to spark cluster for running the 
> training model job, we need to save trained model to hadoop file system 
> before the spark application completes.
>  * But we want to support local model inference, that means if we save the 
> model by current spark DataFrame writer (e.g. parquet format), when loading 
> model we have to rely on the spark service. But we hope we can load model 
> without spark service. So we want the model being saved as the original 
> binary format that our ML code can handle.
>  
> So we need to add an API like `copyLocalFileToHadoopFS`,
> The implementation could be:
>  
> (1) call `add_artifact` API to upload local file to spark driver (spark 
> connect already support this)
> (2) In spark driver side, we can get `sparkContext.hadoopConf` and then using 
> Hadoop FileSystem API to upload file to Hadoop FS.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-43790) Add API `copyLocalFileToHadoopFS`

2023-05-24 Thread Weichen Xu (Jira)
Weichen Xu created SPARK-43790:
--

 Summary: Add API `copyLocalFileToHadoopFS`
 Key: SPARK-43790
 URL: https://issues.apache.org/jira/browse/SPARK-43790
 Project: Spark
  Issue Type: Sub-task
  Components: Connect, ML, PySpark
Affects Versions: 3.5.0
Reporter: Weichen Xu


In new distributed spark ML module (designed to support spark connect and 
support local inference)

We need to save ML model to hadoop file system using custom binary file format, 
the reason is:
 * We often submit a spark application to spark cluster for running the 
training model job, we need to save trained model to hadoop file system before 
the spark application completes.
 * But we want to support local model inference, that means if we save the 
model by current spark DataFrame writer (e.g. parquet format), when loading 
model we have to rely on the spark service. But we hope we can load model 
without spark service. So we want the model being saved as the original binary 
format that our ML code can handle.

 

So we need to add an API like `copyLocalFileToHadoopFS`,

The implementation could be:

 

(1) call `add_artifact` API to upload local file to spark driver (spark connect 
already support this)

(2) In spark driver side, we can get `sparkContext.hadoopConf` and then using 
Hadoop FileSystem API to upload file to Hadoop FS.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43516) Basic estimator / transformer / model / evaluator interfaces and basic transformer / evaluator implementation

2023-05-24 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-43516:
---
Description: 
* Define basic interfaces of Evaluator / Transformer / Model / Evaluator, these 
interfaces are designed to support both spark connect and legacy spark mode, 
and are designed to support train / transform / evaluate over either spark 
dataframe or pandas dataframe
 * Implement a feature transformer {{{}MaxAbsScaler{}}}, {{ScalerScaler}}
 * Implement a regressor evaluator {{RegressorEvaluator}} that supports MSE and 
R2 metric evaluation

> Basic estimator / transformer / model / evaluator interfaces and basic 
> transformer / evaluator implementation
> -
>
> Key: SPARK-43516
> URL: https://issues.apache.org/jira/browse/SPARK-43516
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
> Fix For: 3.5.0
>
>
> * Define basic interfaces of Evaluator / Transformer / Model / Evaluator, 
> these interfaces are designed to support both spark connect and legacy spark 
> mode, and are designed to support train / transform / evaluate over either 
> spark dataframe or pandas dataframe
>  * Implement a feature transformer {{{}MaxAbsScaler{}}}, {{ScalerScaler}}
>  * Implement a regressor evaluator {{RegressorEvaluator}} that supports MSE 
> and R2 metric evaluation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43516) Basic estimator / transformer / model / evaluator interfaces and basic transformer / evaluator implementation

2023-05-24 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-43516:
---
Summary: Basic estimator / transformer / model / evaluator interfaces and 
basic transformer / evaluator implementation  (was: Basic estimator / 
transformer / model / evaluator interfaces)

> Basic estimator / transformer / model / evaluator interfaces and basic 
> transformer / evaluator implementation
> -
>
> Key: SPARK-43516
> URL: https://issues.apache.org/jira/browse/SPARK-43516
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-43516) Basic estimator / transformer / model / evaluator interfaces

2023-05-24 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-43516.

Fix Version/s: 3.5.0
   Resolution: Fixed

Issue resolved by pull request 41176
[https://github.com/apache/spark/pull/41176]

> Basic estimator / transformer / model / evaluator interfaces
> 
>
> Key: SPARK-43516
> URL: https://issues.apache.org/jira/browse/SPARK-43516
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-42501) High level design doc for Distributed ML <> spark connect

2023-05-23 Thread Weichen Xu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725614#comment-17725614
 ] 

Weichen Xu commented on SPARK-42501:


doc is linked.

> High level design doc for Distributed ML <> spark connect
> -
>
> Key: SPARK-42501
> URL: https://issues.apache.org/jira/browse/SPARK-42501
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, Documentation, ML
>Affects Versions: 3.4.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-42501) High level design doc for Distributed ML <> spark connect

2023-05-23 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-42501:
---
Summary: High level design doc for Distributed ML <> spark connect  (was: 
High level design doc for Spark ML)

> High level design doc for Distributed ML <> spark connect
> -
>
> Key: SPARK-42501
> URL: https://issues.apache.org/jira/browse/SPARK-42501
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, Documentation, ML
>Affects Versions: 3.4.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-42471) Distributed ML <> spark connect

2023-05-23 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-42471:
---
Summary: Distributed ML <> spark connect  (was: Feature parity: ML API in 
Spark Connect)

> Distributed ML <> spark connect
> ---
>
> Key: SPARK-42471
> URL: https://issues.apache.org/jira/browse/SPARK-42471
> Project: Spark
>  Issue Type: Umbrella
>  Components: Connect, ML
>Affects Versions: 3.4.0
>Reporter: Ruifeng Zheng
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43715) Add spark DataFrame binary file format writer

2023-05-23 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-43715:
---
Description: 
In new distributed spark ML module (designed to support spark connect and 
support local inference)

We need to save ML model to hadoop file system using custom binary file format, 
the reason is:
 * We often submit a spark application to spark cluster for running the 
training model job, we need to save trained model to hadoop file system before 
the spark application completes.
 * But we want to support local model inference, that means if we save the 
model by current spark DataFrame writer (e.g. parquet format), when loading 
model we have to rely on the spark service. But we hope we can load model 
without spark service. So we want the model being saved as the original binary 
format that our ML code can handle.

We already have reader API of "binaryFile" format, we need to add a writer API:

{*}Writer API{*}:

Supposing we have a dataframe with schema:

[file_path: String, content: binary],

we can save the dataframe to a hadoop path, each row we will save it as a file 
under the hadoop path, the saved file path is \{hadoop path}/\{file_path}, 
"file_path" can be a multiple part path.

  was:
In new distributed spark ML module (designed to support spark connect and 
support local inference)

We need to save ML model to hadoop file system using custom binary file format, 
the reason is:
 * We often submit a spark application to spark cluster for running the 
training model job, we need to save trained model to hadoop file system before 
the spark application completes.
 * But we want to support local model inference, that means if we save the 
model by current spark DataFrame writer (e.g. parquet format), when loading 
model we have to rely on the spark service. But we hope we can load model 
without spark service. So we want the model being saved as the original binary 
format that our ML code can handle.

so we need to add a DataFrame reader / writer format, that can load / save 
binary files, the API is like:

 

{*}Writer API{*}:

Supposing we have a dataframe with schema:

[file_path: String, content: binary],

we can save the dataframe to a hadoop path, each row we will save it as a file 
under the hadoop path, the saved file path is \{hadoop path}/\{file_path}, 
"file_path" can be a multiple part path.

 

{*}Reader API{*}:

`spark.read.format("binaryFileV2").load(...)`

 

It will return a spark dataframe , each row contains the file path and the file 
content binary string.

 


> Add spark DataFrame binary file format writer
> -
>
> Key: SPARK-43715
> URL: https://issues.apache.org/jira/browse/SPARK-43715
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>
> In new distributed spark ML module (designed to support spark connect and 
> support local inference)
> We need to save ML model to hadoop file system using custom binary file 
> format, the reason is:
>  * We often submit a spark application to spark cluster for running the 
> training model job, we need to save trained model to hadoop file system 
> before the spark application completes.
>  * But we want to support local model inference, that means if we save the 
> model by current spark DataFrame writer (e.g. parquet format), when loading 
> model we have to rely on the spark service. But we hope we can load model 
> without spark service. So we want the model being saved as the original 
> binary format that our ML code can handle.
> We already have reader API of "binaryFile" format, we need to add a writer 
> API:
> {*}Writer API{*}:
> Supposing we have a dataframe with schema:
> [file_path: String, content: binary],
> we can save the dataframe to a hadoop path, each row we will save it as a 
> file under the hadoop path, the saved file path is \{hadoop 
> path}/\{file_path}, "file_path" can be a multiple part path.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43715) Add spark DataFrame binary file format writer

2023-05-23 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-43715:
---
Summary: Add spark DataFrame binary file format writer  (was: Add spark 
DataFrame binary file reader / writer)

> Add spark DataFrame binary file format writer
> -
>
> Key: SPARK-43715
> URL: https://issues.apache.org/jira/browse/SPARK-43715
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>
> In new distributed spark ML module (designed to support spark connect and 
> support local inference)
> We need to save ML model to hadoop file system using custom binary file 
> format, the reason is:
>  * We often submit a spark application to spark cluster for running the 
> training model job, we need to save trained model to hadoop file system 
> before the spark application completes.
>  * But we want to support local model inference, that means if we save the 
> model by current spark DataFrame writer (e.g. parquet format), when loading 
> model we have to rely on the spark service. But we hope we can load model 
> without spark service. So we want the model being saved as the original 
> binary format that our ML code can handle.
> so we need to add a DataFrame reader / writer format, that can load / save 
> binary files, the API is like:
>  
> {*}Writer API{*}:
> Supposing we have a dataframe with schema:
> [file_path: String, content: binary],
> we can save the dataframe to a hadoop path, each row we will save it as a 
> file under the hadoop path, the saved file path is \{hadoop 
> path}/\{file_path}, "file_path" can be a multiple part path.
>  
> {*}Reader API{*}:
> `spark.read.format("binaryFileV2").load(...)`
>  
> It will return a spark dataframe , each row contains the file path and the 
> file content binary string.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-43715) Add spark DataFrame binary file reader / writer

2023-05-22 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-43715:
--

Assignee: Weichen Xu

> Add spark DataFrame binary file reader / writer
> ---
>
> Key: SPARK-43715
> URL: https://issues.apache.org/jira/browse/SPARK-43715
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>
> In new distributed spark ML module (designed to support spark connect and 
> support local inference)
> We need to save ML model to hadoop file system using custom binary file 
> format, the reason is:
>  * We often submit a spark application to spark cluster for running the 
> training model job, we need to save trained model to hadoop file system 
> before the spark application completes.
>  * But we want to support local model inference, that means if we save the 
> model by current spark DataFrame writer (e.g. parquet format), when loading 
> model we have to rely on the spark service. But we hope we can load model 
> without spark service. So we want the model being saved as the original 
> binary format that our ML code can handle.
> so we need to add a DataFrame reader / writer format, that can load / save 
> binary files, the API is like:
>  
> {*}Writer API{*}:
> Supposing we have a dataframe with schema:
> [file_path: String, content: binary],
> we can save the dataframe to a hadoop path, each row we will save it as a 
> file under the hadoop path, the saved file path is \{hadoop 
> path}/\{file_path}, "file_path" can be a multiple part path.
>  
> {*}Reader API{*}:
> `spark.read.format("binaryFileV2").load(...)`
>  
> It will return a spark dataframe , each row contains the file path and the 
> file content binary string.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43715) Add spark DataFrame binary file reader / writer

2023-05-22 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-43715:
---
Description: 
In new distributed spark ML module (designed to support spark connect and 
support local inference)

We need to save ML model to hadoop file system using custom binary file format, 
the reason is:
 * We often submit a spark application to spark cluster for running the 
training model job, we need to save trained model to hadoop file system before 
the spark application completes.
 * But we want to support local model inference, that means if we save the 
model by current spark DataFrame writer (e.g. parquet format), when loading 
model we have to rely on the spark service. But we hope we can load model 
without spark service. So we want the model being saved as the original binary 
format that our ML code can handle.

so we need to add a DataFrame reader / writer format, that can load / save 
binary files, the API is like:

 

{*}Writer API{*}:

Supposing we have a dataframe with schema:

[file_path: String, content: binary],

we can save the dataframe to a hadoop path, each row we will save it as a file 
under the hadoop path, the saved file path is \{hadoop path}/\{file_path}, 
"file_path" can be a multiple part path.

 

{*}Reader API{*}:

`spark.read.format("binaryFileV2").load(...)`

 

It will return a spark dataframe , each row contains the file path and the file 
content binary string.

 

  was:
In new distributed spark ML module (designed to support spark connect and 
support local inference)

We need to save ML model to hadoop file system using custom binary file format, 
the reason is:
 * The training model job is a spark job, we need to save trained model to 
hadoop file sytem after the job completes.
 * But we want to support local model inference, that means if we save the 
model by current spark DataFrame writer (e.g. parquet format), when loading 
model we have to rely on the spark service. But we hope we can load model 
without spark service. So we want the model being saved as the original binary 
format that our ML code can handle.

so we need to add a DataFrame reader / writer format, that can load / save 
binary files, the API is like:

 

{*}Writer API{*}:

Supposing we have a dataframe with schema:

[file_path: String, content: binary],

we can save the dataframe to a hadoop path, each row we will save it as a file 
under the hadoop path, the saved file path is \{hadoop path}/\{file_path}, 
"file_path" can be a multiple part path.

 

{*}Reader API{*}:

`spark.read.format("binaryFileV2").load(...)`

 

It will return a spark dataframe , each row contains the file path and the file 
content binary string.

 


> Add spark DataFrame binary file reader / writer
> ---
>
> Key: SPARK-43715
> URL: https://issues.apache.org/jira/browse/SPARK-43715
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Priority: Major
>
> In new distributed spark ML module (designed to support spark connect and 
> support local inference)
> We need to save ML model to hadoop file system using custom binary file 
> format, the reason is:
>  * We often submit a spark application to spark cluster for running the 
> training model job, we need to save trained model to hadoop file system 
> before the spark application completes.
>  * But we want to support local model inference, that means if we save the 
> model by current spark DataFrame writer (e.g. parquet format), when loading 
> model we have to rely on the spark service. But we hope we can load model 
> without spark service. So we want the model being saved as the original 
> binary format that our ML code can handle.
> so we need to add a DataFrame reader / writer format, that can load / save 
> binary files, the API is like:
>  
> {*}Writer API{*}:
> Supposing we have a dataframe with schema:
> [file_path: String, content: binary],
> we can save the dataframe to a hadoop path, each row we will save it as a 
> file under the hadoop path, the saved file path is \{hadoop 
> path}/\{file_path}, "file_path" can be a multiple part path.
>  
> {*}Reader API{*}:
> `spark.read.format("binaryFileV2").load(...)`
>  
> It will return a spark dataframe , each row contains the file path and the 
> file content binary string.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43715) Add spark DataFrame binary file reader / writer

2023-05-22 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-43715:
---
Description: 
In new distributed spark ML module (designed to support spark connect and 
support local inference)

We need to save ML model to hadoop file system using custom binary file format, 
the reason is:
 * The training model job is a spark job, we need to save trained model to 
hadoop file sytem after the job completes.
 * But we want to support local model inference, that means if we save the 
model by current spark DataFrame writer (e.g. parquet format), when loading 
model we have to rely on the spark service. But we hope we can load model 
without spark service. So we want the model being saved as the original binary 
format that our ML code can handle.

so we need to add a DataFrame reader / writer format, that can load / save 
binary files, the API is like:

 

{*}Writer API{*}:

Supposing we have a dataframe with schema:

[file_path: String, content: binary],

we can save the dataframe to a hadoop path, each row we will save it as a file 
under the hadoop path, the saved file path is \{hadoop path}/\{file_path}, 
"file_path" can be a multiple part path.

 

{*}Reader API{*}:

`spark.read.format("binaryFileV2").load(...)`

 

It will return a spark dataframe , each row contains the file path and the file 
content binary string.

 

  was:
In new distributed spark ML module (designed to support spark connect and 
support local inference)

We need to save ML model to hadoop file system using custom binary file format, 
the reason is:
 * The training model job is a spark job, we need to save trained model to 
hadoop file sytem after the job completes.
 * But we want to support local model inference, that means if we save the 
model by current spark DataFrame writer (e.g. parquet format), when loading 
model we have to rely on the spark service. But we hope we can load model 
without spark service. So we want the model being saved as the original binary 
format that our ML code can handle.

so we need to add a DataFrame reader / writer format, that can load / save 
binary files, the API is like:

 

{*}Writer API{*}:

Supposing we have a dataframe with schema:

[file_path: String, content: binary],

we can save the dataframe to a hadoop path, each row we will save it as a file 
under the hadoop path, the saved file path is \{hadoop path}/\{file_path}, 
"file_path" can be a multiple part path.

 

Reader API:

`spark.read.format("binaryFileV2").load(...)`

 

It will return a spark dataframe , each row contains the file path and the file 
content binary string.

 


> Add spark DataFrame binary file reader / writer
> ---
>
> Key: SPARK-43715
> URL: https://issues.apache.org/jira/browse/SPARK-43715
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Priority: Major
>
> In new distributed spark ML module (designed to support spark connect and 
> support local inference)
> We need to save ML model to hadoop file system using custom binary file 
> format, the reason is:
>  * The training model job is a spark job, we need to save trained model to 
> hadoop file sytem after the job completes.
>  * But we want to support local model inference, that means if we save the 
> model by current spark DataFrame writer (e.g. parquet format), when loading 
> model we have to rely on the spark service. But we hope we can load model 
> without spark service. So we want the model being saved as the original 
> binary format that our ML code can handle.
> so we need to add a DataFrame reader / writer format, that can load / save 
> binary files, the API is like:
>  
> {*}Writer API{*}:
> Supposing we have a dataframe with schema:
> [file_path: String, content: binary],
> we can save the dataframe to a hadoop path, each row we will save it as a 
> file under the hadoop path, the saved file path is \{hadoop 
> path}/\{file_path}, "file_path" can be a multiple part path.
>  
> {*}Reader API{*}:
> `spark.read.format("binaryFileV2").load(...)`
>  
> It will return a spark dataframe , each row contains the file path and the 
> file content binary string.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43715) Add spark DataFrame binary file reader / writer

2023-05-22 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-43715:
---
Description: 
In new distributed spark ML module (designed to support spark connect and 
support local inference)

We need to save ML model to hadoop file system using custom binary file format, 
the reason is:
 * The training model job is a spark job, we need to save trained model to 
hadoop file sytem after the job completes.
 * But we want to support local model inference, that means if we save the 
model by current spark DataFrame writer (e.g. parquet format), when loading 
model we have to rely on the spark service. But we hope we can load model 
without spark service. So we want the model being saved as the original binary 
format that our ML code can handle.

so we need to add a DataFrame reader / writer format, that can load / save 
binary files, the API is like:

 

{*}Writer API{*}:

Supposing we have a dataframe with schema:

[file_path: String, content: binary],

we can save the dataframe to a hadoop path, each row we will save it as a file 
under the hadoop path, the saved file path is \{hadoop path}/\{file_path}, 
"file_path" can be a multiple part path.

 

Reader API:

`spark.read.format("binaryFileV2").load(...)`

 

It will return a spark dataframe , each row contains the file path and the file 
content binary string.

 

  was:
In new distributed spark ML module (designed to support spark connect and 
support local inference)

We need to save ML model to hadoop file system using custom binary file format, 
the reason is:
 * The training model job is a spark job, we need to save trained model to 
hadoop file sytem after the job completes.
 * But we want to support local model inference, that means if we save the 
model by current spark DataFrame writer (e.g. parquet format), when loading 
model we have to rely on the spark service. But we hope we can load model 
without spark service. So we want the model being saved as the original binary 
format that our ML code can handle.

so we need to add a DataFrame reader / writer format, that can load / save 
binary files, the API is like:


> Add spark DataFrame binary file reader / writer
> ---
>
> Key: SPARK-43715
> URL: https://issues.apache.org/jira/browse/SPARK-43715
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Priority: Major
>
> In new distributed spark ML module (designed to support spark connect and 
> support local inference)
> We need to save ML model to hadoop file system using custom binary file 
> format, the reason is:
>  * The training model job is a spark job, we need to save trained model to 
> hadoop file sytem after the job completes.
>  * But we want to support local model inference, that means if we save the 
> model by current spark DataFrame writer (e.g. parquet format), when loading 
> model we have to rely on the spark service. But we hope we can load model 
> without spark service. So we want the model being saved as the original 
> binary format that our ML code can handle.
> so we need to add a DataFrame reader / writer format, that can load / save 
> binary files, the API is like:
>  
> {*}Writer API{*}:
> Supposing we have a dataframe with schema:
> [file_path: String, content: binary],
> we can save the dataframe to a hadoop path, each row we will save it as a 
> file under the hadoop path, the saved file path is \{hadoop 
> path}/\{file_path}, "file_path" can be a multiple part path.
>  
> Reader API:
> `spark.read.format("binaryFileV2").load(...)`
>  
> It will return a spark dataframe , each row contains the file path and the 
> file content binary string.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43715) Add spark DataFrame binary file writer

2023-05-22 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-43715:
---
Description: 
In new distributed spark ML module (designed to support spark connect and 
support local inference)

We need to save ML model to hadoop file system using custom binary file format, 
the reason is:
 * The training model job is a spark job, we need to save trained model to 
hadoop file sytem after the job completes.
 * But we want to support local model inference, that means if we save the 
model by current spark DataFrame writer (e.g. parquet format), when loading 
model we have to rely on the spark service. But we hope we can load model 
without spark service. So we want the model being saved as the original binary 
format that our ML code can handle.

so we need to add a DataFrame reader / writer format, that can load / save 
binary files, the API is like:

  was:In distributed 


> Add spark DataFrame binary file writer
> --
>
> Key: SPARK-43715
> URL: https://issues.apache.org/jira/browse/SPARK-43715
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Priority: Major
>
> In new distributed spark ML module (designed to support spark connect and 
> support local inference)
> We need to save ML model to hadoop file system using custom binary file 
> format, the reason is:
>  * The training model job is a spark job, we need to save trained model to 
> hadoop file sytem after the job completes.
>  * But we want to support local model inference, that means if we save the 
> model by current spark DataFrame writer (e.g. parquet format), when loading 
> model we have to rely on the spark service. But we hope we can load model 
> without spark service. So we want the model being saved as the original 
> binary format that our ML code can handle.
> so we need to add a DataFrame reader / writer format, that can load / save 
> binary files, the API is like:



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43715) Add spark DataFrame binary file reader / writer

2023-05-22 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-43715:
---
Summary: Add spark DataFrame binary file reader / writer  (was: Add spark 
DataFrame binary file writer)

> Add spark DataFrame binary file reader / writer
> ---
>
> Key: SPARK-43715
> URL: https://issues.apache.org/jira/browse/SPARK-43715
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Priority: Major
>
> In new distributed spark ML module (designed to support spark connect and 
> support local inference)
> We need to save ML model to hadoop file system using custom binary file 
> format, the reason is:
>  * The training model job is a spark job, we need to save trained model to 
> hadoop file sytem after the job completes.
>  * But we want to support local model inference, that means if we save the 
> model by current spark DataFrame writer (e.g. parquet format), when loading 
> model we have to rely on the spark service. But we hope we can load model 
> without spark service. So we want the model being saved as the original 
> binary format that our ML code can handle.
> so we need to add a DataFrame reader / writer format, that can load / save 
> binary files, the API is like:



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43715) Add spark DataFrame binary file writer

2023-05-22 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-43715:
---
Description: In distributed 

> Add spark DataFrame binary file writer
> --
>
> Key: SPARK-43715
> URL: https://issues.apache.org/jira/browse/SPARK-43715
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Priority: Major
>
> In distributed 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-43715) Add spark DataFrame binary file writer

2023-05-22 Thread Weichen Xu (Jira)
Weichen Xu created SPARK-43715:
--

 Summary: Add spark DataFrame binary file writer
 Key: SPARK-43715
 URL: https://issues.apache.org/jira/browse/SPARK-43715
 Project: Spark
  Issue Type: Sub-task
  Components: ML
Affects Versions: 3.5.0
Reporter: Weichen Xu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-43516) Basic estimator / transformer / model / evaluator interfaces

2023-05-15 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-43516:
--

Assignee: Weichen Xu

> Basic estimator / transformer / model / evaluator interfaces
> 
>
> Key: SPARK-43516
> URL: https://issues.apache.org/jira/browse/SPARK-43516
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-43516) Basic estimator / transformer / model / evaluator interfaces

2023-05-15 Thread Weichen Xu (Jira)
Weichen Xu created SPARK-43516:
--

 Summary: Basic estimator / transformer / model / evaluator 
interfaces
 Key: SPARK-43516
 URL: https://issues.apache.org/jira/browse/SPARK-43516
 Project: Spark
  Issue Type: Sub-task
  Components: Connect, ML, PySpark
Affects Versions: 3.5.0
Reporter: Weichen Xu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-43081) Add torch distributor data loader that loads data from spark partition data

2023-04-30 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-43081.

Target Version/s: 3.5.0
  Resolution: Done

> Add torch distributor data loader that loads data from spark partition data
> ---
>
> Key: SPARK-43081
> URL: https://issues.apache.org/jira/browse/SPARK-43081
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>
> Add torch distributor data loader that loads data from spark partition data.
>  
> We can add 2 APIs like:
> Adds a `TorchDistributor` method API :
> {code:java}
>      def train_on_dataframe(self, train_function, spark_dataframe, *args, 
> **kwargs):
>         """
>         Runs distributed training using provided spark DataFrame as input 
> data.
>         You should ensure the input spark DataFrame have evenly divided 
> partitions,
>         and this method starts a barrier spark job that each spark task in 
> the job
>         process one partition of the input spark DataFrame.
>         Parameters
>         --
>         train_function :
>             Either a PyTorch function, PyTorch Lightning function that 
> launches distributed
>             training. Note that inside the function, you can call
>             `pyspark.ml.torch.distributor.get_spark_partition_data_loader` 
> API to get a torch
>             data loader, the data loader loads data from the corresponding 
> partition of the
>             input spark DataFrame.
>         spark_dataframe :
>             An input spark DataFrame that can be used in PyTorch 
> `train_function` function.
>             See `train_function` argument doc for details.
>         args :
>             `args` need to be the input parameters to `train_function` 
> function. It would look like
>             >>> model = distributor.run(train, 1e-3, 64)
>             where train is a function and 1e-3 and 64 are regular numeric 
> inputs to the function.
>         kwargs :
>             `kwargs` need to be the key-work input parameters to 
> `train_function` function.
>             It would look like
>             >>> model = distributor.run(train, tol=1e-3, max_iter=64)
>             where train is a function that has 2 arguments `tol` and 
> `max_iter`.
>         Returns
>         ---
>             Returns the output of `train_function` called with args inside 
> spark rank 0 task.
>         """{code}
>  
> Adds an loader API:
>  
> {code:java}
>  def get_spark_partition_data_loader(num_samples, batch_size, prefetch=2):
>     """
>     This function must be called inside the `train_function` where 
> `train_function`
>     is the input argument of `TorchDistributor.train_on_dataframe`.
>     The function returns a pytorch data loader that loads data from
>     the corresponding spark partition data.
>     Parameters
>     --
>     num_samples :
>         Number of samples to generate per epoch. If `num_samples` is less 
> than the number of
>         rows in the spark partition, it generate the first `num_samples` rows 
> of
>         the spark partition, if `num_samples` is greater than the number of
>         rows in the spark partition, then after the iterator loaded all rows 
> from the partition,
>         it wraps round back to the first row.
>     batch_size:
>         How many samples per batch to load.
>     prefetch:
>         Number of batches loaded in advance.
>     """{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-43289) PySpark UDF supports python package dependencies

2023-04-25 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-43289:
--

Assignee: Weichen Xu

> PySpark UDF supports python package dependencies
> 
>
> Key: SPARK-43289
> URL: https://issues.apache.org/jira/browse/SPARK-43289
> Project: Spark
>  Issue Type: New Feature
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>
> h3. Requirements
>  
> Make the pyspark UDF support annotating python dependencies and when 
> executing UDF, the UDF worker creates a new python environment with provided 
> python dependencies.
> h3. Motivation
>  
> We have two major cases:
>  
>  * For spark connect case, the client python environment is very likely to be 
> different with pyspark server side python environment, this causes user's UDF 
> function execution failure in pyspark server side.
>  * Some machine learning third-party library (e.g. MLflow) requires pyspark 
> UDF supporting  dependencies, because in ML cases, we need to run model 
> inference by pyspark UDF in the exactly the same python environment that 
> trains the model. Currently MLflow supports it by creating a child python 
> process in pyspark UDF worker, and redirecting all UDF input data to the 
> child python process to run model inference, this way it causes significant 
> overhead, if pyspark UDF support builtin python dependency management then we 
> don't need such poorly performing approach.
>  
> h3. Proposed API
> ```
> @pandas_udf("string", pip_requirements=...)
> ```
> `pip_requirements` argument means either an iterable of pip requirement 
> strings (e.g. ``["scikit-learn", "-r /path/to/req2.txt", "-c 
> /path/to/constraints.txt"]``) or the string path to a pip requirements file 
> path on the local filesystem (e.g. ``"/path/to/requirements.txt"``) 
> represents the pip requirements for the python UDF.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-43289) PySpark UDF supports python package dependencies

2023-04-25 Thread Weichen Xu (Jira)
Weichen Xu created SPARK-43289:
--

 Summary: PySpark UDF supports python package dependencies
 Key: SPARK-43289
 URL: https://issues.apache.org/jira/browse/SPARK-43289
 Project: Spark
  Issue Type: New Feature
  Components: Connect, ML, PySpark
Affects Versions: 3.5.0
Reporter: Weichen Xu


h3. Requirements

 

Make the pyspark UDF support annotating python dependencies and when executing 
UDF, the UDF worker creates a new python environment with provided python 
dependencies.
h3. Motivation

 

We have two major cases:

 
 * For spark connect case, the client python environment is very likely to be 
different with pyspark server side python environment, this causes user's UDF 
function execution failure in pyspark server side.
 * Some machine learning third-party library (e.g. MLflow) requires pyspark UDF 
supporting  dependencies, because in ML cases, we need to run model inference 
by pyspark UDF in the exactly the same python environment that trains the 
model. Currently MLflow supports it by creating a child python process in 
pyspark UDF worker, and redirecting all UDF input data to the child python 
process to run model inference, this way it causes significant overhead, if 
pyspark UDF support builtin python dependency management then we don't need 
such poorly performing approach.

 
h3. Proposed API

```

@pandas_udf("string", pip_requirements=...)

```

`pip_requirements` argument means either an iterable of pip requirement strings 
(e.g. ``["scikit-learn", "-r /path/to/req2.txt", "-c 
/path/to/constraints.txt"]``) or the string path to a pip requirements file 
path on the local filesystem (e.g. ``"/path/to/requirements.txt"``) represents 
the pip requirements for the python UDF.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-43097) Implement pyspark ML logistic regression estimator on top of torch distributor

2023-04-11 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-43097:
--

Assignee: Weichen Xu

> Implement pyspark ML logistic regression estimator on top of torch distributor
> --
>
> Key: SPARK-43097
> URL: https://issues.apache.org/jira/browse/SPARK-43097
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-43097) Implement pyspark ML logistic regression estimator on top of torch distributor

2023-04-11 Thread Weichen Xu (Jira)
Weichen Xu created SPARK-43097:
--

 Summary: Implement pyspark ML logistic regression estimator on top 
of torch distributor
 Key: SPARK-43097
 URL: https://issues.apache.org/jira/browse/SPARK-43097
 Project: Spark
  Issue Type: Sub-task
  Components: Connect, ML, PySpark
Affects Versions: 3.5.0
Reporter: Weichen Xu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-43081) Add torch distributor data loader that loads data from spark partition data

2023-04-10 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-43081:
--

Assignee: Weichen Xu

> Add torch distributor data loader that loads data from spark partition data
> ---
>
> Key: SPARK-43081
> URL: https://issues.apache.org/jira/browse/SPARK-43081
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>
> Add torch distributor data loader that loads data from spark partition data.
>  
> We can add 2 APIs like:
> Adds a `TorchDistributor` method API :
> {code:java}
>      def train_on_dataframe(self, train_function, spark_dataframe, *args, 
> **kwargs):
>         """
>         Runs distributed training using provided spark DataFrame as input 
> data.
>         You should ensure the input spark DataFrame have evenly divided 
> partitions,
>         and this method starts a barrier spark job that each spark task in 
> the job
>         process one partition of the input spark DataFrame.
>         Parameters
>         --
>         train_function :
>             Either a PyTorch function, PyTorch Lightning function that 
> launches distributed
>             training. Note that inside the function, you can call
>             `pyspark.ml.torch.distributor.get_spark_partition_data_loader` 
> API to get a torch
>             data loader, the data loader loads data from the corresponding 
> partition of the
>             input spark DataFrame.
>         spark_dataframe :
>             An input spark DataFrame that can be used in PyTorch 
> `train_function` function.
>             See `train_function` argument doc for details.
>         args :
>             `args` need to be the input parameters to `train_function` 
> function. It would look like
>             >>> model = distributor.run(train, 1e-3, 64)
>             where train is a function and 1e-3 and 64 are regular numeric 
> inputs to the function.
>         kwargs :
>             `kwargs` need to be the key-work input parameters to 
> `train_function` function.
>             It would look like
>             >>> model = distributor.run(train, tol=1e-3, max_iter=64)
>             where train is a function that has 2 arguments `tol` and 
> `max_iter`.
>         Returns
>         ---
>             Returns the output of `train_function` called with args inside 
> spark rank 0 task.
>         """{code}
>  
> Adds an loader API:
>  
> {code:java}
>  def get_spark_partition_data_loader(num_samples, batch_size, prefetch=2):
>     """
>     This function must be called inside the `train_function` where 
> `train_function`
>     is the input argument of `TorchDistributor.train_on_dataframe`.
>     The function returns a pytorch data loader that loads data from
>     the corresponding spark partition data.
>     Parameters
>     --
>     num_samples :
>         Number of samples to generate per epoch. If `num_samples` is less 
> than the number of
>         rows in the spark partition, it generate the first `num_samples` rows 
> of
>         the spark partition, if `num_samples` is greater than the number of
>         rows in the spark partition, then after the iterator loaded all rows 
> from the partition,
>         it wraps round back to the first row.
>     batch_size:
>         How many samples per batch to load.
>     prefetch:
>         Number of batches loaded in advance.
>     """{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43081) Add torch distributor data loader that loads data from spark partition data

2023-04-10 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-43081:
---
Description: 
Add torch distributor data loader that loads data from spark partition data.

 

We can add 2 APIs like:

 

Adds a `TorchDistributor` method API :

```
    def train_on_dataframe(self, train_function, spark_dataframe, *args, 
**kwargs):
        """
        Runs distributed training using provided spark DataFrame as input data.
        You should ensure the input spark DataFrame have evenly divided 
partitions,
        and this method starts a barrier spark job that each spark task in the 
job
        process one partition of the input spark DataFrame.

        Parameters
        --
        train_function :
            Either a PyTorch function, PyTorch Lightning function that launches 
distributed
            training. Note that inside the function, you can call
            `pyspark.ml.torch.distributor.get_spark_partition_data_loader` API 
to get a torch
            data loader, the data loader loads data from the corresponding 
partition of the
            input spark DataFrame.
        spark_dataframe :
            An input spark DataFrame that can be used in PyTorch 
`train_function` function.
            See `train_function` argument doc for details.
        args :
            `args` need to be the input parameters to `train_function` 
function. It would look like

            >>> model = distributor.run(train, 1e-3, 64)

            where train is a function and 1e-3 and 64 are regular numeric 
inputs to the function.
        kwargs :
            `kwargs` need to be the key-work input parameters to 
`train_function` function.
            It would look like

            >>> model = distributor.run(train, tol=1e-3, max_iter=64)

            where train is a function that has 2 arguments `tol` and `max_iter`.

        Returns
        ---
            Returns the output of `train_function` called with args inside 
spark rank 0 task.
        """
```

 

Adds an loader API:

```
def get_spark_partition_data_loader(num_samples, batch_size, prefetch=2):
    """
    This function must be called inside the `train_function` where 
`train_function`
    is the input argument of `TorchDistributor.train_on_dataframe`.
    The function returns a pytorch data loader that loads data from
    the corresponding spark partition data.

    Parameters
    --
    num_samples :
        Number of samples to generate per epoch. If `num_samples` is less than 
the number of
        rows in the spark partition, it generate the first `num_samples` rows of
        the spark partition, if `num_samples` is greater than the number of
        rows in the spark partition, then after the iterator loaded all rows 
from the partition,
        it wraps round back to the first row.
    batch_size:
        How many samples per batch to load.
    prefetch:
        Number of batches loaded in advance.
    """
```

  was:Add torch distributor data loader that loads data from spark partition 
data.


> Add torch distributor data loader that loads data from spark partition data
> ---
>
> Key: SPARK-43081
> URL: https://issues.apache.org/jira/browse/SPARK-43081
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, ML, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Priority: Major
>
> Add torch distributor data loader that loads data from spark partition data.
>  
> We can add 2 APIs like:
>  
> Adds a `TorchDistributor` method API :
> ```
>     def train_on_dataframe(self, train_function, spark_dataframe, *args, 
> **kwargs):
>         """
>         Runs distributed training using provided spark DataFrame as input 
> data.
>         You should ensure the input spark DataFrame have evenly divided 
> partitions,
>         and this method starts a barrier spark job that each spark task in 
> the job
>         process one partition of the input spark DataFrame.
>         Parameters
>         --
>         train_function :
>             Either a PyTorch function, PyTorch Lightning function that 
> launches distributed
>             training. Note that inside the function, you can call
>             `pyspark.ml.torch.distributor.get_spark_partition_data_loader` 
> API to get a torch
>             data loader, the data loader loads data from the corresponding 
> partition of the
>             input spark DataFrame.
>         spark_dataframe :
>             An input spark DataFrame that can be used in PyTorch 
> `train_function` function.
>             See `train_function` argument doc for details.
>         args :
>             `args` need to be the input parameters to `train_function` 
> function. It would look like
>             >>> model = 

[jira] [Updated] (SPARK-43081) Add torch distributor data loader that loads data from spark partition data

2023-04-10 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-43081:
---
Description: 
Add torch distributor data loader that loads data from spark partition data.

 

We can add 2 APIs like:

Adds a `TorchDistributor` method API :
{code:java}
     def train_on_dataframe(self, train_function, spark_dataframe, *args, 
**kwargs):
        """
        Runs distributed training using provided spark DataFrame as input data.
        You should ensure the input spark DataFrame have evenly divided 
partitions,
        and this method starts a barrier spark job that each spark task in the 
job
        process one partition of the input spark DataFrame.
        Parameters
        --
        train_function :
            Either a PyTorch function, PyTorch Lightning function that launches 
distributed
            training. Note that inside the function, you can call
            `pyspark.ml.torch.distributor.get_spark_partition_data_loader` API 
to get a torch
            data loader, the data loader loads data from the corresponding 
partition of the
            input spark DataFrame.
        spark_dataframe :
            An input spark DataFrame that can be used in PyTorch 
`train_function` function.
            See `train_function` argument doc for details.
        args :
            `args` need to be the input parameters to `train_function` 
function. It would look like
            >>> model = distributor.run(train, 1e-3, 64)
            where train is a function and 1e-3 and 64 are regular numeric 
inputs to the function.
        kwargs :
            `kwargs` need to be the key-work input parameters to 
`train_function` function.
            It would look like
            >>> model = distributor.run(train, tol=1e-3, max_iter=64)
            where train is a function that has 2 arguments `tol` and `max_iter`.
        Returns
        ---
            Returns the output of `train_function` called with args inside 
spark rank 0 task.
        """{code}
 

Adds an loader API:

 
{code:java}
 def get_spark_partition_data_loader(num_samples, batch_size, prefetch=2):
    """
    This function must be called inside the `train_function` where 
`train_function`
    is the input argument of `TorchDistributor.train_on_dataframe`.
    The function returns a pytorch data loader that loads data from
    the corresponding spark partition data.
    Parameters
    --
    num_samples :
        Number of samples to generate per epoch. If `num_samples` is less than 
the number of
        rows in the spark partition, it generate the first `num_samples` rows of
        the spark partition, if `num_samples` is greater than the number of
        rows in the spark partition, then after the iterator loaded all rows 
from the partition,
        it wraps round back to the first row.
    batch_size:
        How many samples per batch to load.
    prefetch:
        Number of batches loaded in advance.
    """{code}

  was:
Add torch distributor data loader that loads data from spark partition data.

 

We can add 2 APIs like:

 

Adds a `TorchDistributor` method API :

```
    def train_on_dataframe(self, train_function, spark_dataframe, *args, 
**kwargs):
        """
        Runs distributed training using provided spark DataFrame as input data.
        You should ensure the input spark DataFrame have evenly divided 
partitions,
        and this method starts a barrier spark job that each spark task in the 
job
        process one partition of the input spark DataFrame.

        Parameters
        --
        train_function :
            Either a PyTorch function, PyTorch Lightning function that launches 
distributed
            training. Note that inside the function, you can call
            `pyspark.ml.torch.distributor.get_spark_partition_data_loader` API 
to get a torch
            data loader, the data loader loads data from the corresponding 
partition of the
            input spark DataFrame.
        spark_dataframe :
            An input spark DataFrame that can be used in PyTorch 
`train_function` function.
            See `train_function` argument doc for details.
        args :
            `args` need to be the input parameters to `train_function` 
function. It would look like

            >>> model = distributor.run(train, 1e-3, 64)

            where train is a function and 1e-3 and 64 are regular numeric 
inputs to the function.
        kwargs :
            `kwargs` need to be the key-work input parameters to 
`train_function` function.
            It would look like

            >>> model = distributor.run(train, tol=1e-3, max_iter=64)

            where train is a function that has 2 arguments `tol` and `max_iter`.

        Returns
        ---
            Returns the output of `train_function` called with args inside 
spark rank 0 task.
        """
```

 

Adds an loader API:


[jira] [Created] (SPARK-43081) Add torch distributor data loader that loads data from spark partition data

2023-04-10 Thread Weichen Xu (Jira)
Weichen Xu created SPARK-43081:
--

 Summary: Add torch distributor data loader that loads data from 
spark partition data
 Key: SPARK-43081
 URL: https://issues.apache.org/jira/browse/SPARK-43081
 Project: Spark
  Issue Type: Sub-task
  Components: Connect, ML, PySpark
Affects Versions: 3.5.0
Reporter: Weichen Xu


Add torch distributor data loader that loads data from spark partition data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-42929) make mapInPandas / mapInArrow support "is_barrier"

2023-03-27 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-42929.

   Fix Version/s: 3.5.0
Target Version/s: 3.5.0
  Resolution: Fixed

> make mapInPandas / mapInArrow support "is_barrier"
> --
>
> Key: SPARK-42929
> URL: https://issues.apache.org/jira/browse/SPARK-42929
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
> Fix For: 3.5.0
>
>
> make mapInPandas / mapInArrow support "is_barrier"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-42929) make mapInPandas / mapInArrow support "is_barrier"

2023-03-27 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-42929:
--

Assignee: Weichen Xu

> make mapInPandas / mapInArrow support "is_barrier"
> --
>
> Key: SPARK-42929
> URL: https://issues.apache.org/jira/browse/SPARK-42929
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>
> make mapInPandas / mapInArrow support "is_barrier"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-42929) make mapInPandas / mapInArrow support "is_barrier"

2023-03-27 Thread Weichen Xu (Jira)
Weichen Xu created SPARK-42929:
--

 Summary: make mapInPandas / mapInArrow support "is_barrier"
 Key: SPARK-42929
 URL: https://issues.apache.org/jira/browse/SPARK-42929
 Project: Spark
  Issue Type: Sub-task
  Components: Connect, PySpark
Affects Versions: 3.5.0
Reporter: Weichen Xu


make mapInPandas / mapInArrow support "is_barrier"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-42896) Make `mapInPandas` / mapInArrow` support barrier mode execution

2023-03-26 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-42896:
--

Assignee: Weichen Xu

> Make `mapInPandas` / mapInArrow` support barrier mode execution
> ---
>
> Key: SPARK-42896
> URL: https://issues.apache.org/jira/browse/SPARK-42896
> Project: Spark
>  Issue Type: New Feature
>  Components: Pandas API on Spark, PySpark, SQL
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>
> Make `mapInPandas` / mapInArrow` support barrier mode execution



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-42896) Make `mapInPandas` / mapInArrow` support barrier mode execution

2023-03-26 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-42896.

Fix Version/s: 3.5.0
   Resolution: Fixed

Issue resolved by pull request 40520
[https://github.com/apache/spark/pull/40520]

> Make `mapInPandas` / mapInArrow` support barrier mode execution
> ---
>
> Key: SPARK-42896
> URL: https://issues.apache.org/jira/browse/SPARK-42896
> Project: Spark
>  Issue Type: New Feature
>  Components: Pandas API on Spark, PySpark, SQL
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
> Fix For: 3.5.0
>
>
> Make `mapInPandas` / mapInArrow` support barrier mode execution



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-42896) Make `mapInPandas` / mapInArrow` support barrier mode execution

2023-03-22 Thread Weichen Xu (Jira)
Weichen Xu created SPARK-42896:
--

 Summary: Make `mapInPandas` / mapInArrow` support barrier mode 
execution
 Key: SPARK-42896
 URL: https://issues.apache.org/jira/browse/SPARK-42896
 Project: Spark
  Issue Type: New Feature
  Components: Pandas API on Spark, PySpark, SQL
Affects Versions: 3.5.0
Reporter: Weichen Xu


Make `mapInPandas` / mapInArrow` support barrier mode execution



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-42732) Support spark connect session getActiveSession

2023-03-14 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-42732:
--

Assignee: Weichen Xu

> Support spark connect session getActiveSession
> --
>
> Key: SPARK-42732
> URL: https://issues.apache.org/jira/browse/SPARK-42732
> Project: Spark
>  Issue Type: New Feature
>  Components: Connect, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
>
> Support spark connect session getActiveSession method.
> Spark connect ML needs this API to get active session in some cases (e.g. 
> fetching model attributes from server side).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-42732) Support spark connect session getActiveSession

2023-03-14 Thread Weichen Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu resolved SPARK-42732.

Fix Version/s: 3.5.0
   Resolution: Fixed

Issue resolved by pull request 40353
[https://github.com/apache/spark/pull/40353]

> Support spark connect session getActiveSession
> --
>
> Key: SPARK-42732
> URL: https://issues.apache.org/jira/browse/SPARK-42732
> Project: Spark
>  Issue Type: New Feature
>  Components: Connect, PySpark
>Affects Versions: 3.5.0
>Reporter: Weichen Xu
>Assignee: Weichen Xu
>Priority: Major
> Fix For: 3.5.0
>
>
> Support spark connect session getActiveSession method.
> Spark connect ML needs this API to get active session in some cases (e.g. 
> fetching model attributes from server side).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   3   4   5   6   7   >