wbo4958 commented on code in PR #48791: URL: https://github.com/apache/spark/pull/48791#discussion_r1877543825
########## sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/Serializer.scala: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.ml + +import org.apache.spark.connect.proto +import org.apache.spark.ml.linalg.{DenseMatrix, DenseVector, SparseMatrix, SparseVector} +import org.apache.spark.ml.param.Params +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.connect.common.LiteralValueProtoConverter +import org.apache.spark.sql.connect.service.SessionHolder + +private[ml] object Serializer { + + /** + * Serialize the ML parameters, currently only support Vector/Matrix and literals Review Comment: No, for now, I haven't seen other types that need to be supported for spark.ml But probably we need after we're supporting other estimators. ########## sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala: ########## @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.ml + +import java.util.ServiceLoader + +import scala.collection.immutable.HashSet +import scala.jdk.CollectionConverters.{IterableHasAsScala, MapHasAsScala} + +import org.apache.commons.lang3.reflect.MethodUtils.{invokeMethod, invokeStaticMethod} + +import org.apache.spark.connect.proto +import org.apache.spark.ml.{Estimator, Transformer} +import org.apache.spark.ml.linalg.{Matrices, Matrix, Vector, Vectors} +import org.apache.spark.ml.param.Params +import org.apache.spark.ml.util.MLWritable +import org.apache.spark.sql.{DataFrame, Dataset} +import org.apache.spark.sql.connect.common.LiteralValueProtoConverter +import org.apache.spark.sql.connect.planner.SparkConnectPlanner +import org.apache.spark.sql.connect.service.SessionHolder +import org.apache.spark.util.Utils + +private[ml] object MLUtils { + + private lazy val estimators: Map[String, Class[_]] = { + val loader = Utils.getContextOrSparkClassLoader + val serviceLoader = ServiceLoader.load(classOf[Estimator[_]], loader) + val providers = serviceLoader.asScala.toList + providers.map(est => est.getClass.getName -> est.getClass).toMap + } + + private lazy val transformers: Map[String, Class[_]] = { + val loader = Utils.getContextOrSparkClassLoader + val serviceLoader = ServiceLoader.load(classOf[Transformer], loader) + val providers = serviceLoader.asScala.toList + providers.map(est => est.getClass.getName -> est.getClass).toMap + } + + def deserializeVector(vector: proto.Vector): Vector = { + if (vector.hasDense) { + val values = vector.getDense.getValueList.asScala.map(_.toDouble).toArray + Vectors.dense(values) + } else { + val size = vector.getSparse.getSize + val indices = vector.getSparse.getIndexList.asScala.map(_.toInt).toArray + val values = vector.getSparse.getValueList.asScala.map(_.toDouble).toArray + Vectors.sparse(size, indices, values) + } + } + + def deserializeMatrix(matrix: proto.Matrix): Matrix = { + if (matrix.hasDense) { + val values = matrix.getDense.getValueList.asScala.map(_.toDouble).toArray + Matrices.dense(matrix.getDense.getNumRows, matrix.getDense.getNumCols, values) + } else { + val sparse = matrix.getSparse + val colPtrs = sparse.getColptrList.asScala.map(_.toInt).toArray + val rowIndices = sparse.getRowIndexList.asScala.map(_.toInt).toArray + val values = sparse.getValueList.asScala.map(_.toDouble).toArray + Matrices.sparse(sparse.getNumRows, sparse.getNumCols, colPtrs, rowIndices, values) + } + } + + def setInstanceParams(instance: Params, params: proto.MlParams): Unit = { + params.getParamsMap.asScala.foreach { case (name, paramProto) => + val p = instance.getParam(name) + val value = if (paramProto.hasLiteral) { + convertParamValue( + p.paramValueClassTag.runtimeClass, + LiteralValueProtoConverter.toCatalystValue(paramProto.getLiteral)) + } else if (paramProto.hasVector) { + deserializeVector(paramProto.getVector) + } else if (paramProto.hasMatrix) { + deserializeMatrix(paramProto.getMatrix) + } else { + throw MlUnsupportedException(s"Unsupported parameter type for ${name}") + } + instance.set(p, value) + } + } + + private def convertArray(paramType: Class[_], array: Array[_]): Array[_] = { + if (paramType == classOf[Byte]) { + array.map(_.asInstanceOf[Byte]) + } else if (paramType == classOf[Short]) { + array.map(_.asInstanceOf[Short]) + } else if (paramType == classOf[Int]) { + array.map(_.asInstanceOf[Int]) + } else if (paramType == classOf[Long]) { + array.map(_.asInstanceOf[Long]) + } else if (paramType == classOf[Float]) { + array.map(_.asInstanceOf[Float]) + } else if (paramType == classOf[Double]) { + array.map(_.asInstanceOf[Double]) + } else if (paramType == classOf[String]) { + array.map(_.asInstanceOf[String]) + } else { + array + } + } + + private def convertParamValue(paramType: Class[_], value: Any): Any = { + // Some cases the param type might be mismatched with the value type. + // Because in python side we only have int / float type for numeric params. + // e.g.: + // param type is Int but client sends a Long type. + // param type is Long but client sends a Int type. + // param type is Float but client sends a Double type. + // param type is Array[Int] but client sends a Array[Long] type. + // param type is Array[Float] but client sends a Array[Double] type. + // param type is Array[Array[Int]] but client sends a Array[Array[Long]] type. + // param type is Array[Array[Float]] but client sends a Array[Array[Double]] type. + if (paramType == classOf[Byte]) { + value.asInstanceOf[java.lang.Number].byteValue() + } else if (paramType == classOf[Short]) { + value.asInstanceOf[java.lang.Number].shortValue() + } else if (paramType == classOf[Int]) { + value.asInstanceOf[java.lang.Number].intValue() + } else if (paramType == classOf[Long]) { + value.asInstanceOf[java.lang.Number].longValue() + } else if (paramType == classOf[Float]) { + value.asInstanceOf[java.lang.Number].floatValue() + } else if (paramType == classOf[Double]) { + value.asInstanceOf[java.lang.Number].doubleValue() + } else if (paramType.isArray) { + val compType = paramType.getComponentType + val array = value.asInstanceOf[Array[_]].map { e => + convertParamValue(compType, e) + } + convertArray(compType, array) + } else { + value + } + } + + def parseRelationProto(relation: proto.Relation, sessionHolder: SessionHolder): DataFrame = { + val planner = new SparkConnectPlanner(sessionHolder) + val plan = planner.transformRelation(relation) + Dataset.ofRows(sessionHolder.session, plan) + } + + /** + * Get the Estimator instance according to the proto information + * + * @param operator + * MlOperator information + * @param params + * The optional parameters of the estimator + * @return + * the estimator + */ + def getEstimator(operator: proto.MlOperator, params: Option[proto.MlParams]): Estimator[_] = { + // TODO support plugin + // Get the estimator according to the operator name + val name = operator.getName + if (estimators.isEmpty || !estimators.contains(name)) { + throw MlUnsupportedException(s"Unsupported Estimator, found ${name}") + } + val uid = operator.getUid + val estimator: Estimator[_] = estimators(name) + .getConstructor(classOf[String]) + .newInstance(uid) + .asInstanceOf[Estimator[_]] + + // Set parameters for the estimator + params.foreach(p => MLUtils.setInstanceParams(estimator, p)) + estimator + } + + def load(className: String, path: String): Object = { + // scalastyle:off classforname + val clazz = Class.forName(className) + // scalastyle:on classforname + invokeStaticMethod(clazz, "load", path) + } + + /** + * Get the transformer instance according to the transform proto + * + * @param transformProto + * transform proto + * @return + * a transformer + */ + def getTransformer(transformProto: proto.MlRelation.Transform): Transformer = { + // Get the transformer name Review Comment: sounds good. Done. ########## sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala: ########## @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.ml + +import java.util.ServiceLoader + +import scala.collection.immutable.HashSet +import scala.jdk.CollectionConverters.{IterableHasAsScala, MapHasAsScala} + +import org.apache.commons.lang3.reflect.MethodUtils.{invokeMethod, invokeStaticMethod} + +import org.apache.spark.connect.proto +import org.apache.spark.ml.{Estimator, Transformer} +import org.apache.spark.ml.linalg.{Matrices, Matrix, Vector, Vectors} +import org.apache.spark.ml.param.Params +import org.apache.spark.ml.util.MLWritable +import org.apache.spark.sql.{DataFrame, Dataset} +import org.apache.spark.sql.connect.common.LiteralValueProtoConverter +import org.apache.spark.sql.connect.planner.SparkConnectPlanner +import org.apache.spark.sql.connect.service.SessionHolder +import org.apache.spark.util.Utils + +private[ml] object MLUtils { + + private lazy val estimators: Map[String, Class[_]] = { + val loader = Utils.getContextOrSparkClassLoader + val serviceLoader = ServiceLoader.load(classOf[Estimator[_]], loader) + val providers = serviceLoader.asScala.toList + providers.map(est => est.getClass.getName -> est.getClass).toMap + } + + private lazy val transformers: Map[String, Class[_]] = { + val loader = Utils.getContextOrSparkClassLoader + val serviceLoader = ServiceLoader.load(classOf[Transformer], loader) + val providers = serviceLoader.asScala.toList + providers.map(est => est.getClass.getName -> est.getClass).toMap + } + + def deserializeVector(vector: proto.Vector): Vector = { + if (vector.hasDense) { + val values = vector.getDense.getValueList.asScala.map(_.toDouble).toArray + Vectors.dense(values) + } else { + val size = vector.getSparse.getSize + val indices = vector.getSparse.getIndexList.asScala.map(_.toInt).toArray + val values = vector.getSparse.getValueList.asScala.map(_.toDouble).toArray + Vectors.sparse(size, indices, values) + } + } + + def deserializeMatrix(matrix: proto.Matrix): Matrix = { + if (matrix.hasDense) { + val values = matrix.getDense.getValueList.asScala.map(_.toDouble).toArray + Matrices.dense(matrix.getDense.getNumRows, matrix.getDense.getNumCols, values) + } else { + val sparse = matrix.getSparse + val colPtrs = sparse.getColptrList.asScala.map(_.toInt).toArray + val rowIndices = sparse.getRowIndexList.asScala.map(_.toInt).toArray + val values = sparse.getValueList.asScala.map(_.toDouble).toArray + Matrices.sparse(sparse.getNumRows, sparse.getNumCols, colPtrs, rowIndices, values) + } + } + + def setInstanceParams(instance: Params, params: proto.MlParams): Unit = { + params.getParamsMap.asScala.foreach { case (name, paramProto) => + val p = instance.getParam(name) + val value = if (paramProto.hasLiteral) { + convertParamValue( + p.paramValueClassTag.runtimeClass, + LiteralValueProtoConverter.toCatalystValue(paramProto.getLiteral)) + } else if (paramProto.hasVector) { + deserializeVector(paramProto.getVector) + } else if (paramProto.hasMatrix) { + deserializeMatrix(paramProto.getMatrix) + } else { + throw MlUnsupportedException(s"Unsupported parameter type for ${name}") + } + instance.set(p, value) + } + } + + private def convertArray(paramType: Class[_], array: Array[_]): Array[_] = { + if (paramType == classOf[Byte]) { + array.map(_.asInstanceOf[Byte]) + } else if (paramType == classOf[Short]) { + array.map(_.asInstanceOf[Short]) + } else if (paramType == classOf[Int]) { + array.map(_.asInstanceOf[Int]) + } else if (paramType == classOf[Long]) { + array.map(_.asInstanceOf[Long]) + } else if (paramType == classOf[Float]) { + array.map(_.asInstanceOf[Float]) + } else if (paramType == classOf[Double]) { + array.map(_.asInstanceOf[Double]) + } else if (paramType == classOf[String]) { + array.map(_.asInstanceOf[String]) + } else { + array + } + } + + private def convertParamValue(paramType: Class[_], value: Any): Any = { + // Some cases the param type might be mismatched with the value type. + // Because in python side we only have int / float type for numeric params. + // e.g.: + // param type is Int but client sends a Long type. + // param type is Long but client sends a Int type. + // param type is Float but client sends a Double type. + // param type is Array[Int] but client sends a Array[Long] type. + // param type is Array[Float] but client sends a Array[Double] type. + // param type is Array[Array[Int]] but client sends a Array[Array[Long]] type. + // param type is Array[Array[Float]] but client sends a Array[Array[Double]] type. + if (paramType == classOf[Byte]) { + value.asInstanceOf[java.lang.Number].byteValue() + } else if (paramType == classOf[Short]) { + value.asInstanceOf[java.lang.Number].shortValue() + } else if (paramType == classOf[Int]) { + value.asInstanceOf[java.lang.Number].intValue() + } else if (paramType == classOf[Long]) { + value.asInstanceOf[java.lang.Number].longValue() + } else if (paramType == classOf[Float]) { + value.asInstanceOf[java.lang.Number].floatValue() + } else if (paramType == classOf[Double]) { + value.asInstanceOf[java.lang.Number].doubleValue() + } else if (paramType.isArray) { + val compType = paramType.getComponentType + val array = value.asInstanceOf[Array[_]].map { e => + convertParamValue(compType, e) + } + convertArray(compType, array) + } else { + value + } + } + + def parseRelationProto(relation: proto.Relation, sessionHolder: SessionHolder): DataFrame = { + val planner = new SparkConnectPlanner(sessionHolder) + val plan = planner.transformRelation(relation) + Dataset.ofRows(sessionHolder.session, plan) + } + + /** + * Get the Estimator instance according to the proto information + * + * @param operator + * MlOperator information + * @param params + * The optional parameters of the estimator + * @return + * the estimator + */ + def getEstimator(operator: proto.MlOperator, params: Option[proto.MlParams]): Estimator[_] = { + // TODO support plugin + // Get the estimator according to the operator name + val name = operator.getName + if (estimators.isEmpty || !estimators.contains(name)) { + throw MlUnsupportedException(s"Unsupported Estimator, found ${name}") + } + val uid = operator.getUid + val estimator: Estimator[_] = estimators(name) + .getConstructor(classOf[String]) + .newInstance(uid) + .asInstanceOf[Estimator[_]] + + // Set parameters for the estimator + params.foreach(p => MLUtils.setInstanceParams(estimator, p)) + estimator + } + + def load(className: String, path: String): Object = { + // scalastyle:off classforname + val clazz = Class.forName(className) Review Comment: Thx for the suggestion about `SparkClassUtils.classForName`. > Other question, why are you not using the registry for this. Good catch, it seems we need to use the registry. ########## sql/connect/common/src/main/protobuf/spark/connect/relations.proto: ########## @@ -97,13 +98,60 @@ message Relation { // Catalog API (experimental / unstable) Catalog catalog = 200; + // ML relation + MlRelation ml_relation = 300; + // This field is used to mark extensions to the protocol. When plugins generate arbitrary // relations they can add them here. During the planning the correct resolution is done. google.protobuf.Any extension = 998; Unknown unknown = 999; } } +// Relation to represent ML world +message MlRelation { Review Comment: Not too much, just group the ML things together. The existing relations are SQL related. I just didn't want to mix ML things into SQL. But definitely, we could put them in the Relations. ########## sql/connect/common/src/main/protobuf/spark/connect/relations.proto: ########## @@ -97,13 +98,60 @@ message Relation { // Catalog API (experimental / unstable) Catalog catalog = 200; + // ML relation + MlRelation ml_relation = 300; + // This field is used to mark extensions to the protocol. When plugins generate arbitrary // relations they can add them here. During the planning the correct resolution is done. google.protobuf.Any extension = 998; Unknown unknown = 999; } } +// Relation to represent ML world +message MlRelation { + oneof ml_type { + Transform transform = 1; + FetchAttr fetch_attr = 2; + } + // Relation to represent transform(input) of the operator + // which could be a cached model or a new transformer + message Transform { + oneof operator { + // Object reference + ObjectRef obj_ref = 1; + // Could be an ML transformer like VectorAssembler + MlOperator transformer = 2; + } + // the input dataframe + Relation input = 3; + // the operator specific parameters + MlParams params = 4; + } +} + +// Message for fetching attribute from object on the server side. +// FetchAttr can be represented as a Relation or a ML command +// Eg, model.coefficients, model.summary.weightedPrecision +// or model.summary.roc which returns a DataFrame +message FetchAttr { + // (Required) reference to the object on the server side or + // the intermediate attribute of the model. eg, "model.summary" + ObjectRef obj_ref = 1; + // (Required) the method name. Eg, "coefficients" of the model + // and "weightedPrecision" of "model.summary" + string method = 2; + // (Optional) the parameters of the method + repeated Args args = 3; + + message Args { Review Comment: I don't think it's necessary to support named arguments, given that we're leveraging reflection to do the "invoking" ########## sql/connect/common/src/main/protobuf/spark/connect/relations.proto: ########## @@ -97,13 +98,60 @@ message Relation { // Catalog API (experimental / unstable) Catalog catalog = 200; + // ML relation + MlRelation ml_relation = 300; + // This field is used to mark extensions to the protocol. When plugins generate arbitrary // relations they can add them here. During the planning the correct resolution is done. google.protobuf.Any extension = 998; Unknown unknown = 999; } } +// Relation to represent ML world +message MlRelation { + oneof ml_type { + Transform transform = 1; + FetchAttr fetch_attr = 2; + } + // Relation to represent transform(input) of the operator + // which could be a cached model or a new transformer + message Transform { + oneof operator { + // Object reference + ObjectRef obj_ref = 1; + // Could be an ML transformer like VectorAssembler + MlOperator transformer = 2; + } + // the input dataframe + Relation input = 3; + // the operator specific parameters + MlParams params = 4; + } +} + +// Message for fetching attribute from object on the server side. +// FetchAttr can be represented as a Relation or a ML command Review Comment: Hmm, Some attributes return the literal results, while the other return the DataFrame. ########## sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLHandler.scala: ########## @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.ml + +import scala.jdk.CollectionConverters.CollectionHasAsScala + +import org.apache.spark.connect.proto +import org.apache.spark.internal.Logging +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param.ParamMap +import org.apache.spark.ml.util.{MLWritable, Summary} +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.connect.common.LiteralValueProtoConverter +import org.apache.spark.sql.connect.ml.Serializer.deserializeMethodArguments +import org.apache.spark.sql.connect.service.SessionHolder + +/** + * Helper function to get the attribute from an object by reflection + */ +private class AttributeHelper( + val sessionHolder: SessionHolder, + val objIdentifier: String, + val method: Option[String], + val argValues: Array[Object] = Array.empty, + val argClasses: Array[Class[_]] = Array.empty) { + + private val methodChain = method.map(n => s"$objIdentifier.$n").getOrElse(objIdentifier) + private val methodChains = methodChain.split("\\.") + private val modelId = methodChains.head + + protected lazy val instance = sessionHolder.mlCache.get(modelId) + private lazy val methods = methodChains.slice(1, methodChains.length) + + def getAttribute: Any = { + assert(methods.length >= 1) + if (argValues.length == 0) { + methods.foldLeft(instance) { (obj, attribute) => + MLUtils.invokeMethodAllowed(obj, attribute) + } + } else { + val lastMethod = methods.last + if (methods.length == 1) { + MLUtils.invokeMethodAllowed(instance, lastMethod, argValues, argClasses) + } else { + val prevMethods = methods.slice(0, methods.length - 1) + val finalObj = prevMethods.foldLeft(instance) { (obj, attribute) => + MLUtils.invokeMethodAllowed(obj, attribute) + } + MLUtils.invokeMethodAllowed(finalObj, lastMethod, argValues, argClasses) + } + } + } +} + +private class ModelAttributeHelper( + sessionHolder: SessionHolder, + objIdentifier: String, + method: Option[String], + argValues: Array[Object] = Array.empty, + argClasses: Array[Class[_]] = Array.empty) + extends AttributeHelper(sessionHolder, objIdentifier, method, argValues, argClasses) { + + def transform(relation: proto.MlRelation.Transform): DataFrame = { + // Create a copied model to avoid concurrently modify model params. + val model = instance.asInstanceOf[Model[_]] + val copiedModel = model.copy(ParamMap.empty).asInstanceOf[Model[_]] + MLUtils.setInstanceParams(copiedModel, relation.getParams) + val inputDF = MLUtils.parseRelationProto(relation.getInput, sessionHolder) + copiedModel.transform(inputDF) + } +} + +private object AttributeHelper { + def apply( + sessionHolder: SessionHolder, + modelId: String, + method: Option[String] = None, + args: Array[proto.FetchAttr.Args] = Array.empty): AttributeHelper = { + val tmp = deserializeMethodArguments(args, sessionHolder) + val argValues = tmp.map(_._1) + val argClasses = tmp.map(_._2) + new AttributeHelper(sessionHolder, modelId, method, argValues, argClasses) + } +} + +private object ModelAttributeHelper { + def apply( + sessionHolder: SessionHolder, + modelId: String, + method: Option[String] = None, + args: Array[proto.FetchAttr.Args] = Array.empty): ModelAttributeHelper = { + val tmp = deserializeMethodArguments(args, sessionHolder) + val argValues = tmp.map(_._1) + val argClasses = tmp.map(_._2) + new ModelAttributeHelper(sessionHolder, modelId, method, argValues, argClasses) + } +} + +// MLHandler is a utility to group all ML operations +object MLHandler extends Logging { + def handleMlCommand( + sessionHolder: SessionHolder, + mlCommand: proto.MlCommand): proto.MlCommandResult = { + + val mlCache = sessionHolder.mlCache + + mlCommand.getCommandCase match { + case proto.MlCommand.CommandCase.FIT => + val fitCmd = mlCommand.getFit + val estimatorProto = fitCmd.getEstimator + assert(estimatorProto.getType == proto.MlOperator.OperatorType.ESTIMATOR) + + val dataset = MLUtils.parseRelationProto(fitCmd.getDataset, sessionHolder) + val estimator = MLUtils.getEstimator(estimatorProto, Some(fitCmd.getParams)) + val model = estimator.fit(dataset).asInstanceOf[Model[_]] + val id = mlCache.register(model) + proto.MlCommandResult + .newBuilder() + .setOperatorInfo( + proto.MlCommandResult.MlOperatorInfo + .newBuilder() + .setObjRef(proto.ObjectRef.newBuilder().setId(id))) + .build() + + case proto.MlCommand.CommandCase.FETCH_ATTR => + val args = mlCommand.getFetchAttr.getArgsList.asScala.toArray + val helper = AttributeHelper( + sessionHolder, + mlCommand.getFetchAttr.getObjRef.getId, + Option(mlCommand.getFetchAttr.getMethod), + args) + val attrResult = helper.getAttribute + attrResult match { + case s: Summary => + val id = mlCache.register(s) + proto.MlCommandResult.newBuilder().setSummary(id).build() + case _ => + val param = Serializer.serializeParam(attrResult) + proto.MlCommandResult.newBuilder().setParam(param).build() + } + + case proto.MlCommand.CommandCase.DELETE => + val modelId = mlCommand.getDelete.getObjRef.getId + var result = false + if (!modelId.contains(".")) { + mlCache.remove(modelId) + result = true + } + proto.MlCommandResult + .newBuilder() + .setParam( + proto.Param + .newBuilder() + .setLiteral(LiteralValueProtoConverter.toLiteralProto(result)) + .build()) + .build() + + case proto.MlCommand.CommandCase.WRITE => + mlCommand.getWrite.getTypeCase match { + case proto.MlCommand.Writer.TypeCase.OBJ_REF => // save a model + val modelId = mlCommand.getWrite.getObjRef.getId + val model = mlCache.get(modelId).asInstanceOf[Model[_]] + val copiedModel = model.copy(ParamMap.empty).asInstanceOf[Model[_]] + MLUtils.setInstanceParams(copiedModel, mlCommand.getWrite.getParams) + + copiedModel match { + case m: MLWritable => MLUtils.write(m, mlCommand.getWrite) + case other => throw MlUnsupportedException(s"$other is not writable") + } + + // save an estimator/evaluator/transformer + case proto.MlCommand.Writer.TypeCase.OPERATOR => + val writer = mlCommand.getWrite + if (writer.getOperator.getType == proto.MlOperator.OperatorType.ESTIMATOR) { + val estimator = MLUtils.getEstimator(writer.getOperator, Some(writer.getParams)) + estimator match { + case m: MLWritable => MLUtils.write(m, mlCommand.getWrite) + case other => throw MlUnsupportedException(s"Estimator $other is not writable") + } + } else { + throw MlUnsupportedException(s"${writer.getOperator.getName} not supported") + } + + case other => throw MlUnsupportedException(s"$other not supported") + } + proto.MlCommandResult.newBuilder().build() + + case proto.MlCommand.CommandCase.READ => + val operator = mlCommand.getRead.getOperator + val name = operator.getName + val path = mlCommand.getRead.getPath + + if (operator.getType == proto.MlOperator.OperatorType.MODEL) { + val model = MLUtils.load(name, path).asInstanceOf[Model[_]] + val id = mlCache.register(model) + proto.MlCommandResult + .newBuilder() + .setOperatorInfo( + proto.MlCommandResult.MlOperatorInfo + .newBuilder() + .setObjRef(proto.ObjectRef.newBuilder().setId(id)) + .setUid(model.uid) + .setParams(Serializer.serializeParams(model))) + .build() + + } else if (operator.getType == proto.MlOperator.OperatorType.ESTIMATOR) { + val estimator = MLUtils.load(name, path).asInstanceOf[Estimator[_]] + proto.MlCommandResult + .newBuilder() + .setOperatorInfo( + proto.MlCommandResult.MlOperatorInfo + .newBuilder() + .setName(name) + .setUid(estimator.uid) + .setParams(Serializer.serializeParams(estimator))) + .build() + } else { + throw MlUnsupportedException(s"${operator.getType} not supported") + } + + case other => throw MlUnsupportedException(s"$other not supported") + } + } + + def transformMLRelation(relation: proto.MlRelation, sessionHolder: SessionHolder): DataFrame = { Review Comment: Hmm, This function indeed returns the Dataframe, but the caller will return its logicalPlan from https://github.com/apache/spark/pull/48791/files#diff-db5ed413eb10608faef1340838a236cbbac5ca71ad5193a61cc953c66a01c12fR234 ``` scala case proto.Command.CommandTypeCase.ML_COMMAND => handleMlCommand(command.getMlCommand, responseObserver) ``` ########## sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala: ########## @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.ml + +import java.util.ServiceLoader + +import scala.collection.immutable.HashSet +import scala.jdk.CollectionConverters.{IterableHasAsScala, MapHasAsScala} + +import org.apache.commons.lang3.reflect.MethodUtils.{invokeMethod, invokeStaticMethod} + +import org.apache.spark.connect.proto +import org.apache.spark.ml.{Estimator, Transformer} +import org.apache.spark.ml.linalg.{Matrices, Matrix, Vector, Vectors} +import org.apache.spark.ml.param.Params +import org.apache.spark.ml.util.MLWritable +import org.apache.spark.sql.{DataFrame, Dataset} +import org.apache.spark.sql.connect.common.LiteralValueProtoConverter +import org.apache.spark.sql.connect.planner.SparkConnectPlanner +import org.apache.spark.sql.connect.service.SessionHolder +import org.apache.spark.util.Utils + +private[ml] object MLUtils { + + private lazy val estimators: Map[String, Class[_]] = { + val loader = Utils.getContextOrSparkClassLoader + val serviceLoader = ServiceLoader.load(classOf[Estimator[_]], loader) + val providers = serviceLoader.asScala.toList + providers.map(est => est.getClass.getName -> est.getClass).toMap + } + + private lazy val transformers: Map[String, Class[_]] = { + val loader = Utils.getContextOrSparkClassLoader + val serviceLoader = ServiceLoader.load(classOf[Transformer], loader) + val providers = serviceLoader.asScala.toList + providers.map(est => est.getClass.getName -> est.getClass).toMap + } + + def deserializeVector(vector: proto.Vector): Vector = { + if (vector.hasDense) { + val values = vector.getDense.getValueList.asScala.map(_.toDouble).toArray + Vectors.dense(values) + } else { + val size = vector.getSparse.getSize + val indices = vector.getSparse.getIndexList.asScala.map(_.toInt).toArray + val values = vector.getSparse.getValueList.asScala.map(_.toDouble).toArray + Vectors.sparse(size, indices, values) + } + } + + def deserializeMatrix(matrix: proto.Matrix): Matrix = { + if (matrix.hasDense) { + val values = matrix.getDense.getValueList.asScala.map(_.toDouble).toArray + Matrices.dense(matrix.getDense.getNumRows, matrix.getDense.getNumCols, values) + } else { + val sparse = matrix.getSparse + val colPtrs = sparse.getColptrList.asScala.map(_.toInt).toArray + val rowIndices = sparse.getRowIndexList.asScala.map(_.toInt).toArray + val values = sparse.getValueList.asScala.map(_.toDouble).toArray + Matrices.sparse(sparse.getNumRows, sparse.getNumCols, colPtrs, rowIndices, values) + } + } + + def setInstanceParams(instance: Params, params: proto.MlParams): Unit = { + params.getParamsMap.asScala.foreach { case (name, paramProto) => + val p = instance.getParam(name) + val value = if (paramProto.hasLiteral) { + convertParamValue( + p.paramValueClassTag.runtimeClass, + LiteralValueProtoConverter.toCatalystValue(paramProto.getLiteral)) + } else if (paramProto.hasVector) { + deserializeVector(paramProto.getVector) + } else if (paramProto.hasMatrix) { + deserializeMatrix(paramProto.getMatrix) + } else { + throw MlUnsupportedException(s"Unsupported parameter type for ${name}") + } + instance.set(p, value) + } + } + + private def convertArray(paramType: Class[_], array: Array[_]): Array[_] = { + if (paramType == classOf[Byte]) { + array.map(_.asInstanceOf[Byte]) + } else if (paramType == classOf[Short]) { + array.map(_.asInstanceOf[Short]) + } else if (paramType == classOf[Int]) { + array.map(_.asInstanceOf[Int]) + } else if (paramType == classOf[Long]) { + array.map(_.asInstanceOf[Long]) + } else if (paramType == classOf[Float]) { + array.map(_.asInstanceOf[Float]) + } else if (paramType == classOf[Double]) { + array.map(_.asInstanceOf[Double]) + } else if (paramType == classOf[String]) { + array.map(_.asInstanceOf[String]) + } else { + array + } + } + + private def convertParamValue(paramType: Class[_], value: Any): Any = { + // Some cases the param type might be mismatched with the value type. + // Because in python side we only have int / float type for numeric params. + // e.g.: + // param type is Int but client sends a Long type. + // param type is Long but client sends a Int type. + // param type is Float but client sends a Double type. + // param type is Array[Int] but client sends a Array[Long] type. + // param type is Array[Float] but client sends a Array[Double] type. + // param type is Array[Array[Int]] but client sends a Array[Array[Long]] type. + // param type is Array[Array[Float]] but client sends a Array[Array[Double]] type. + if (paramType == classOf[Byte]) { + value.asInstanceOf[java.lang.Number].byteValue() + } else if (paramType == classOf[Short]) { + value.asInstanceOf[java.lang.Number].shortValue() + } else if (paramType == classOf[Int]) { + value.asInstanceOf[java.lang.Number].intValue() + } else if (paramType == classOf[Long]) { + value.asInstanceOf[java.lang.Number].longValue() + } else if (paramType == classOf[Float]) { + value.asInstanceOf[java.lang.Number].floatValue() + } else if (paramType == classOf[Double]) { + value.asInstanceOf[java.lang.Number].doubleValue() + } else if (paramType.isArray) { + val compType = paramType.getComponentType + val array = value.asInstanceOf[Array[_]].map { e => + convertParamValue(compType, e) + } + convertArray(compType, array) + } else { + value + } + } + + def parseRelationProto(relation: proto.Relation, sessionHolder: SessionHolder): DataFrame = { + val planner = new SparkConnectPlanner(sessionHolder) + val plan = planner.transformRelation(relation) + Dataset.ofRows(sessionHolder.session, plan) + } + + /** + * Get the Estimator instance according to the proto information + * + * @param operator + * MlOperator information + * @param params + * The optional parameters of the estimator + * @return + * the estimator + */ + def getEstimator(operator: proto.MlOperator, params: Option[proto.MlParams]): Estimator[_] = { + // TODO support plugin + // Get the estimator according to the operator name + val name = operator.getName + if (estimators.isEmpty || !estimators.contains(name)) { + throw MlUnsupportedException(s"Unsupported Estimator, found ${name}") + } + val uid = operator.getUid + val estimator: Estimator[_] = estimators(name) Review Comment: Typically, we just construct the estimator once and use it train a dataset and return a model. Then we don't need it anymore. ########## sql/connect/common/src/main/protobuf/spark/connect/ml.proto: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = 'proto3'; + +package spark.connect; + +import "spark/connect/expressions.proto"; +import "spark/connect/relations.proto"; +import "spark/connect/ml_common.proto"; + +option java_multiple_files = true; +option java_package = "org.apache.spark.connect.proto"; + +// Command for ML +message MlCommand { + oneof command { + Fit fit = 1; + FetchAttr fetch_attr = 2; + Delete delete = 3; + Writer write = 4; + Reader read = 5; Review Comment: Sound good, Thx ########## sql/connect/common/src/main/protobuf/spark/connect/ml.proto: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = 'proto3'; + +package spark.connect; + +import "spark/connect/expressions.proto"; +import "spark/connect/relations.proto"; +import "spark/connect/ml_common.proto"; + +option java_multiple_files = true; +option java_package = "org.apache.spark.connect.proto"; + +// Command for ML +message MlCommand { Review Comment: We could put the ML specific commands into the existing Command, just like StreamingQueryCommand which groups corresponding commands together, I'd prefer to group ML-specific commands together. ########## sql/connect/common/src/main/protobuf/spark/connect/ml.proto: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = 'proto3'; + +package spark.connect; + +import "spark/connect/expressions.proto"; +import "spark/connect/relations.proto"; +import "spark/connect/ml_common.proto"; + +option java_multiple_files = true; +option java_package = "org.apache.spark.connect.proto"; + +// Command for ML +message MlCommand { + oneof command { + Fit fit = 1; + FetchAttr fetch_attr = 2; + Delete delete = 3; + Writer write = 4; Review Comment: sounds good. thx ########## sql/connect/common/src/main/protobuf/spark/connect/relations.proto: ########## @@ -97,13 +98,60 @@ message Relation { // Catalog API (experimental / unstable) Catalog catalog = 200; + // ML relation + MlRelation ml_relation = 300; + // This field is used to mark extensions to the protocol. When plugins generate arbitrary // relations they can add them here. During the planning the correct resolution is done. google.protobuf.Any extension = 998; Unknown unknown = 999; } } +// Relation to represent ML world +message MlRelation { + oneof ml_type { + Transform transform = 1; + FetchAttr fetch_attr = 2; + } + // Relation to represent transform(input) of the operator + // which could be a cached model or a new transformer + message Transform { + oneof operator { + // Object reference + ObjectRef obj_ref = 1; + // Could be an ML transformer like VectorAssembler + MlOperator transformer = 2; + } + // the input dataframe + Relation input = 3; + // the operator specific parameters + MlParams params = 4; + } +} + +// Message for fetching attribute from object on the server side. +// FetchAttr can be represented as a Relation or a ML command +// Eg, model.coefficients, model.summary.weightedPrecision +// or model.summary.roc which returns a DataFrame +message FetchAttr { Review Comment: Yeah, it is just passing the arguments of the attribute/properties/method in python to the server side to get the final result. But there're two different scenarios, 1. Get the attribute from a cached model. For this case, it's quite simple, obj_ref is the id of the cached model, the method is the name of attribute/property/method, and the args are the arguments of the method. 2. Get the attribute from an intermediate object. For this case, let's take `model.summary` as an example, model.summary returns a Summary object from a cached model on the server side, for example, LogisticRegressionTrainingSummary. The python side also has a corresponding `LogisticRegressionTrainingSummary` class which also has the corresponding attributes. So after having the object of LogisticRegressionTrainingSummary on the python side, we could access the attributes of LogisticRegressionTrainingSummary on the python side. So what this PR does for this case? Firstly, when users call `model.summary` which will not trigger any GRPC call, instead, python client stores "model.summary" into the LogisticRegressionTrainingSummary. Secondly. when users call `model.summary.weightedPrecision`, python client side creates Fetch proto by setting below fields ``` obj_ref="model.summary" method="weightedPrecision" args="arguments of weightedPrecision" ``` The server side will do after receiving Fetch command find the model instance from cache according to the value of obj_ref. and then calling `summary` to get an object of LogisticRegressionTrainingSummary and call weightedPrecision on the object and return the final result. ########## sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLHandler.scala: ########## @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.ml + +import scala.jdk.CollectionConverters.CollectionHasAsScala + +import org.apache.spark.connect.proto +import org.apache.spark.internal.Logging +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param.ParamMap +import org.apache.spark.ml.util.{MLWritable, Summary} +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.connect.common.LiteralValueProtoConverter +import org.apache.spark.sql.connect.ml.Serializer.deserializeMethodArguments +import org.apache.spark.sql.connect.service.SessionHolder + +/** + * Helper function to get the attribute from an object by reflection + */ +private class AttributeHelper( + val sessionHolder: SessionHolder, + val objIdentifier: String, + val method: Option[String], + val argValues: Array[Object] = Array.empty, + val argClasses: Array[Class[_]] = Array.empty) { + + private val methodChain = method.map(n => s"$objIdentifier.$n").getOrElse(objIdentifier) + private val methodChains = methodChain.split("\\.") + private val modelId = methodChains.head + + protected lazy val instance = sessionHolder.mlCache.get(modelId) + private lazy val methods = methodChains.slice(1, methodChains.length) + + def getAttribute: Any = { + assert(methods.length >= 1) + if (argValues.length == 0) { + methods.foldLeft(instance) { (obj, attribute) => + MLUtils.invokeMethodAllowed(obj, attribute) + } + } else { + val lastMethod = methods.last + if (methods.length == 1) { + MLUtils.invokeMethodAllowed(instance, lastMethod, argValues, argClasses) + } else { + val prevMethods = methods.slice(0, methods.length - 1) + val finalObj = prevMethods.foldLeft(instance) { (obj, attribute) => + MLUtils.invokeMethodAllowed(obj, attribute) + } + MLUtils.invokeMethodAllowed(finalObj, lastMethod, argValues, argClasses) + } + } + } +} + +private class ModelAttributeHelper( + sessionHolder: SessionHolder, + objIdentifier: String, + method: Option[String], + argValues: Array[Object] = Array.empty, + argClasses: Array[Class[_]] = Array.empty) + extends AttributeHelper(sessionHolder, objIdentifier, method, argValues, argClasses) { + + def transform(relation: proto.MlRelation.Transform): DataFrame = { + // Create a copied model to avoid concurrently modify model params. + val model = instance.asInstanceOf[Model[_]] + val copiedModel = model.copy(ParamMap.empty).asInstanceOf[Model[_]] + MLUtils.setInstanceParams(copiedModel, relation.getParams) + val inputDF = MLUtils.parseRelationProto(relation.getInput, sessionHolder) + copiedModel.transform(inputDF) + } +} + +private object AttributeHelper { + def apply( + sessionHolder: SessionHolder, + modelId: String, + method: Option[String] = None, + args: Array[proto.FetchAttr.Args] = Array.empty): AttributeHelper = { + val tmp = deserializeMethodArguments(args, sessionHolder) + val argValues = tmp.map(_._1) + val argClasses = tmp.map(_._2) + new AttributeHelper(sessionHolder, modelId, method, argValues, argClasses) + } +} + +private object ModelAttributeHelper { + def apply( + sessionHolder: SessionHolder, + modelId: String, + method: Option[String] = None, + args: Array[proto.FetchAttr.Args] = Array.empty): ModelAttributeHelper = { + val tmp = deserializeMethodArguments(args, sessionHolder) + val argValues = tmp.map(_._1) + val argClasses = tmp.map(_._2) + new ModelAttributeHelper(sessionHolder, modelId, method, argValues, argClasses) + } +} + +// MLHandler is a utility to group all ML operations +object MLHandler extends Logging { + def handleMlCommand( + sessionHolder: SessionHolder, + mlCommand: proto.MlCommand): proto.MlCommandResult = { + + val mlCache = sessionHolder.mlCache + + mlCommand.getCommandCase match { + case proto.MlCommand.CommandCase.FIT => + val fitCmd = mlCommand.getFit + val estimatorProto = fitCmd.getEstimator + assert(estimatorProto.getType == proto.MlOperator.OperatorType.ESTIMATOR) + + val dataset = MLUtils.parseRelationProto(fitCmd.getDataset, sessionHolder) + val estimator = MLUtils.getEstimator(estimatorProto, Some(fitCmd.getParams)) + val model = estimator.fit(dataset).asInstanceOf[Model[_]] + val id = mlCache.register(model) + proto.MlCommandResult + .newBuilder() + .setOperatorInfo( + proto.MlCommandResult.MlOperatorInfo + .newBuilder() + .setObjRef(proto.ObjectRef.newBuilder().setId(id))) + .build() + + case proto.MlCommand.CommandCase.FETCH_ATTR => + val args = mlCommand.getFetchAttr.getArgsList.asScala.toArray + val helper = AttributeHelper( + sessionHolder, + mlCommand.getFetchAttr.getObjRef.getId, + Option(mlCommand.getFetchAttr.getMethod), + args) + val attrResult = helper.getAttribute + attrResult match { + case s: Summary => + val id = mlCache.register(s) + proto.MlCommandResult.newBuilder().setSummary(id).build() + case _ => + val param = Serializer.serializeParam(attrResult) + proto.MlCommandResult.newBuilder().setParam(param).build() + } + + case proto.MlCommand.CommandCase.DELETE => + val modelId = mlCommand.getDelete.getObjRef.getId + var result = false + if (!modelId.contains(".")) { Review Comment: For example, the "modeld" here could be a model reference like AABBCC, or attribute of model, for example "AABBCC.summary", but the server side only caches the model, it didn't cache the summary. so we need to exclude this scenary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
