[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978427835


##
connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.command
+
+import scala.collection.JavaConverters._
+
+import com.google.common.collect.{Lists, Maps}
+
+import org.apache.spark.annotation.{Since, Unstable}
+import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
+import org.apache.spark.sql.types.StringType
+
+@Unstable
+@Since("3.4.0")
+class SparkConnectCommandPlanner(session: SparkSession, command: 
proto.Command) {
+
+  lazy val pythonVersion =
+sys.env.getOrElse("PYSPARK_PYTHON", 
sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python3"))

Review Comment:
   PTAL and reopen if it is unclear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978420688


##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.ByteString
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.annotation.{Since, Unstable}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{Request, Response}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.connect.command.SparkConnectCommandPlanner
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.{
+  AdaptiveSparkPlanExec,
+  AdaptiveSparkPlanHelper,
+  QueryStageExec
+}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.util.ArrowUtils
+
+@Unstable
+@Since("3.4.0")
+class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) 
extends Logging {
+
+  def handle(v: Request): Unit = {
+val session =
+  
SparkConnectService.getOrCreateIsolatedSession(v.getUserContext.getUserId).session
+v.getPlan.getOpTypeCase match {
+  case proto.Plan.OpTypeCase.COMMAND => handleCommand(session, v)
+  case proto.Plan.OpTypeCase.ROOT => handlePlan(session, v)
+  case _ =>
+throw new UnsupportedOperationException(s"${v.getPlan.getOpTypeCase} 
not supported.")
+}
+  }
+
+  def handlePlan(session: SparkSession, request: proto.Request): Unit = {
+// Extract the plan from the request and convert it to a logical plan
+val planner = new SparkConnectPlanner(request.getPlan.getRoot, session)
+val rows =
+  Dataset.ofRows(session, planner.transform())
+processRows(request.getClientId, rows)
+  }
+
+  private def processRows(clientId: String, rows: DataFrame) = {
+val timeZoneId = SQLConf.get.sessionLocalTimeZone
+val schema =
+  ByteString.copyFrom(ArrowUtils.toArrowSchema(rows.schema, 
timeZoneId).toByteArray)
+
+val textSchema = rows.schema.fields.map(f => f.name).mkString("|")
+val data = rows.collect().map(x => x.toSeq.mkString("|")).mkString("\n")
+val bbb = proto.Response.CSVBatch.newBuilder
+  .setRowCount(-1)
+  .setData(textSchema ++ "\n" ++ data)
+  .build()
+val response = 
proto.Response.newBuilder().setClientId(clientId).setCsvBatch(bbb).build()
+
+// Send all the data
+responseObserver.onNext(response)
+responseObserver.onNext(sendMetricsToResponse(clientId, rows))
+responseObserver.onCompleted()
+  }
+
+  def sendMetricsToResponse(clientId: String, rows: DataFrame): Response = {
+// Send a last batch with the metrics
+Response
+  .newBuilder()
+  .setClientId(clientId)
+  
.setMetrics(MetricGenerator.buildMetrics(rows.queryExecution.executedPlan))
+  .build()
+  }
+
+  def handleCommand(session: SparkSession, request: Request): Unit = {
+val command = request.getPlan.getCommand
+val planner = new SparkConnectCommandPlanner(session, command)
+planner.process()
+responseObserver.onCompleted()
+

Review Comment:
   done



##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978420144


##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.ByteString
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.annotation.{Since, Unstable}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{Request, Response}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.connect.command.SparkConnectCommandPlanner
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.{
+  AdaptiveSparkPlanExec,
+  AdaptiveSparkPlanHelper,
+  QueryStageExec
+}

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978419296


##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Since, Unstable}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService implementation.
+ *
+ * This class implements the service stub from the generated code of GRPC.
+ *
+ * @param debug
+ *   delegates debug behavior to the handlers.
+ */
+@Unstable
+@Since("3.4.0")
+class SparkConnectService(debug: Boolean)
+extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
+with Logging {
+
+  /**
+   * This is the main entry method for Spark Connect and all calls to execute 
a plan.
+   *
+   * The plan execution is delegated to the [[SparkConnectStreamHandler]]. All 
error handling
+   * should be directly implemented in the deferred implementation. But this 
method catches
+   * generic errors.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def executePlan(request: Request, responseObserver: 
StreamObserver[Response]): Unit = {
+try {
+  new SparkConnectStreamHandler(responseObserver).handle(request)
+} catch {
+  case e: Throwable =>
+log.error("Error executing plan.", e)
+responseObserver.onError(
+  
Status.UNKNOWN.withCause(e).withDescription(e.getLocalizedMessage).asRuntimeException())
+}
+  }
+
+  /**
+   * Analyze a plan provide metadata and debugging information.
+   *
+   * This method is called to generate the explain plan for a SparkConnect 
plan. In its simplest
+   * implementation, the plan that is generated by the [[SparkConnectPlanner]] 
is used to build a
+   * [[Dataset]] and derive the explain string from the query execution 
details.
+   *
+   * Errors during planning are returned via the [[StreamObserver]] interface.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def analyzePlan(
+  request: Request,
+  responseObserver: StreamObserver[AnalyzeResponse]): Unit = {
+try {
+  val session =
+
SparkConnectService.getOrCreateIsolatedSession(request.getUserContext.getUserId).session
+
+  val logicalPlan = request.getPlan.getOpTypeCase match {
+case proto.Plan.OpTypeCase.ROOT =>
+  new SparkConnectPlanner(request.getPlan.getRoot, session).transform()
+case _ =>
+  responseObserver.onError(
+new UnsupportedOperationException(
+  s"${request.getPlan.getOpTypeCase} not supported for analysis."))
+  return
+  }
+  val ds = Dataset.ofRows(session, logicalPlan)
+  val explainString = ds.queryExecution.explainString(ExtendedMode)
+
+  val resp = proto.AnalyzeResponse
+.newBuilder()
+.setExplainString(explainString)
+.setClientId(request.getClientId)
+
+  resp.addAllColumnTypes(ds.schema.fields.map(_.dataType.sql).toSeq.asJava)
+  resp.addAllColumnNames(ds.schema.fields.map(_.name).toSeq.asJava)
+  responseObserver.onNext(resp.build())
+  responseObserver.onCompleted()
+} catch {
+  case e: Throwable =>
+log.error("Error analyzing plan.", e)
+

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978419138


##
connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.{Since, Unstable}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{expressions, plans}
+import org.apache.spark.sql.catalyst.analysis.{
+  UnresolvedAlias,
+  UnresolvedAttribute,
+  UnresolvedFunction,
+  UnresolvedRelation,
+  UnresolvedStar
+}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.types.{
+  BinaryType,
+  ByteType,
+  DateType,
+  DoubleType,
+  FloatType,
+  IntegerType,
+  ShortType,
+  TimestampType
+}

Review Comment:
   Ok, I looked around and understand better now. Adjusted accordingly.



##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Since, Unstable}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978407163


##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Since, Unstable}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService implementation.
+ *
+ * This class implements the service stub from the generated code of GRPC.
+ *
+ * @param debug
+ *   delegates debug behavior to the handlers.
+ */
+@Unstable
+@Since("3.4.0")
+class SparkConnectService(debug: Boolean)
+extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
+with Logging {
+
+  /**
+   * This is the main entry method for Spark Connect and all calls to execute 
a plan.
+   *
+   * The plan execution is delegated to the [[SparkConnectStreamHandler]]. All 
error handling
+   * should be directly implemented in the deferred implementation. But this 
method catches
+   * generic errors.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def executePlan(request: Request, responseObserver: 
StreamObserver[Response]): Unit = {
+try {
+  new SparkConnectStreamHandler(responseObserver).handle(request)
+} catch {
+  case e: Throwable =>
+log.error("Error executing plan.", e)
+responseObserver.onError(
+  
Status.UNKNOWN.withCause(e).withDescription(e.getLocalizedMessage).asRuntimeException())
+}
+  }
+
+  /**
+   * Analyze a plan provide metadata and debugging information.
+   *
+   * This method is called to generate the explain plan for a SparkConnect 
plan. In its simplest
+   * implementation, the plan that is generated by the [[SparkConnectPlanner]] 
is used to build a
+   * [[Dataset]] and derive the explain string from the query execution 
details.
+   *
+   * Errors during planning are returned via the [[StreamObserver]] interface.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def analyzePlan(
+  request: Request,
+  responseObserver: StreamObserver[AnalyzeResponse]): Unit = {
+try {
+  val session =
+
SparkConnectService.getOrCreateIsolatedSession(request.getUserContext.getUserId).session
+
+  val logicalPlan = request.getPlan.getOpTypeCase match {
+case proto.Plan.OpTypeCase.ROOT =>
+  new SparkConnectPlanner(request.getPlan.getRoot, session).transform()
+case _ =>
+  responseObserver.onError(
+new UnsupportedOperationException(
+  s"${request.getPlan.getOpTypeCase} not supported for analysis."))
+  return
+  }
+  val ds = Dataset.ofRows(session, logicalPlan)
+  val explainString = ds.queryExecution.explainString(ExtendedMode)
+
+  val resp = proto.AnalyzeResponse
+.newBuilder()
+.setExplainString(explainString)
+.setClientId(request.getClientId)
+
+  resp.addAllColumnTypes(ds.schema.fields.map(_.dataType.sql).toSeq.asJava)
+  resp.addAllColumnNames(ds.schema.fields.map(_.name).toSeq.asJava)
+  responseObserver.onNext(resp.build())
+  responseObserver.onCompleted()
+} catch {
+  case e: Throwable =>
+log.error("Error analyzing plan.", e)
+

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978407794


##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Since, Unstable}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService implementation.
+ *
+ * This class implements the service stub from the generated code of GRPC.
+ *
+ * @param debug
+ *   delegates debug behavior to the handlers.
+ */
+@Unstable
+@Since("3.4.0")
+class SparkConnectService(debug: Boolean)
+extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
+with Logging {
+
+  /**
+   * This is the main entry method for Spark Connect and all calls to execute 
a plan.
+   *
+   * The plan execution is delegated to the [[SparkConnectStreamHandler]]. All 
error handling
+   * should be directly implemented in the deferred implementation. But this 
method catches
+   * generic errors.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def executePlan(request: Request, responseObserver: 
StreamObserver[Response]): Unit = {
+try {
+  new SparkConnectStreamHandler(responseObserver).handle(request)
+} catch {
+  case e: Throwable =>
+log.error("Error executing plan.", e)
+responseObserver.onError(
+  
Status.UNKNOWN.withCause(e).withDescription(e.getLocalizedMessage).asRuntimeException())
+}
+  }
+
+  /**
+   * Analyze a plan provide metadata and debugging information.
+   *
+   * This method is called to generate the explain plan for a SparkConnect 
plan. In its simplest
+   * implementation, the plan that is generated by the [[SparkConnectPlanner]] 
is used to build a
+   * [[Dataset]] and derive the explain string from the query execution 
details.
+   *
+   * Errors during planning are returned via the [[StreamObserver]] interface.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def analyzePlan(
+  request: Request,
+  responseObserver: StreamObserver[AnalyzeResponse]): Unit = {
+try {
+  val session =
+
SparkConnectService.getOrCreateIsolatedSession(request.getUserContext.getUserId).session
+
+  val logicalPlan = request.getPlan.getOpTypeCase match {
+case proto.Plan.OpTypeCase.ROOT =>
+  new SparkConnectPlanner(request.getPlan.getRoot, session).transform()
+case _ =>
+  responseObserver.onError(
+new UnsupportedOperationException(
+  s"${request.getPlan.getOpTypeCase} not supported for analysis."))
+  return
+  }
+  val ds = Dataset.ofRows(session, logicalPlan)
+  val explainString = ds.queryExecution.explainString(ExtendedMode)
+
+  val resp = proto.AnalyzeResponse
+.newBuilder()
+.setExplainString(explainString)
+.setClientId(request.getClientId)
+
+  resp.addAllColumnTypes(ds.schema.fields.map(_.dataType.sql).toSeq.asJava)
+  resp.addAllColumnNames(ds.schema.fields.map(_.name).toSeq.asJava)
+  responseObserver.onNext(resp.build())
+  responseObserver.onCompleted()
+} catch {
+  case e: Throwable =>
+log.error("Error analyzing plan.", e)
+

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978406868


##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Since, Unstable}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService implementation.
+ *
+ * This class implements the service stub from the generated code of GRPC.
+ *
+ * @param debug
+ *   delegates debug behavior to the handlers.
+ */
+@Unstable
+@Since("3.4.0")
+class SparkConnectService(debug: Boolean)
+extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
+with Logging {
+
+  /**
+   * This is the main entry method for Spark Connect and all calls to execute 
a plan.
+   *
+   * The plan execution is delegated to the [[SparkConnectStreamHandler]]. All 
error handling
+   * should be directly implemented in the deferred implementation. But this 
method catches
+   * generic errors.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def executePlan(request: Request, responseObserver: 
StreamObserver[Response]): Unit = {
+try {
+  new SparkConnectStreamHandler(responseObserver).handle(request)
+} catch {
+  case e: Throwable =>
+log.error("Error executing plan.", e)
+responseObserver.onError(
+  
Status.UNKNOWN.withCause(e).withDescription(e.getLocalizedMessage).asRuntimeException())
+}
+  }
+
+  /**
+   * Analyze a plan provide metadata and debugging information.
+   *
+   * This method is called to generate the explain plan for a SparkConnect 
plan. In its simplest
+   * implementation, the plan that is generated by the [[SparkConnectPlanner]] 
is used to build a
+   * [[Dataset]] and derive the explain string from the query execution 
details.
+   *
+   * Errors during planning are returned via the [[StreamObserver]] interface.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def analyzePlan(
+  request: Request,
+  responseObserver: StreamObserver[AnalyzeResponse]): Unit = {
+try {
+  val session =
+
SparkConnectService.getOrCreateIsolatedSession(request.getUserContext.getUserId).session
+
+  val logicalPlan = request.getPlan.getOpTypeCase match {
+case proto.Plan.OpTypeCase.ROOT =>
+  new SparkConnectPlanner(request.getPlan.getRoot, session).transform()
+case _ =>
+  responseObserver.onError(
+new UnsupportedOperationException(
+  s"${request.getPlan.getOpTypeCase} not supported for analysis."))
+  return
+  }
+  val ds = Dataset.ofRows(session, logicalPlan)
+  val explainString = ds.queryExecution.explainString(ExtendedMode)
+
+  val resp = proto.AnalyzeResponse
+.newBuilder()
+.setExplainString(explainString)
+.setClientId(request.getClientId)
+
+  resp.addAllColumnTypes(ds.schema.fields.map(_.dataType.sql).toSeq.asJava)
+  resp.addAllColumnNames(ds.schema.fields.map(_.name).toSeq.asJava)
+  responseObserver.onNext(resp.build())
+  responseObserver.onCompleted()
+} catch {
+  case e: Throwable =>
+log.error("Error analyzing plan.", e)
+

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978406012


##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Since, Unstable}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService implementation.
+ *
+ * This class implements the service stub from the generated code of GRPC.
+ *
+ * @param debug
+ *   delegates debug behavior to the handlers.
+ */
+@Unstable
+@Since("3.4.0")
+class SparkConnectService(debug: Boolean)
+extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
+with Logging {
+
+  /**
+   * This is the main entry method for Spark Connect and all calls to execute 
a plan.
+   *
+   * The plan execution is delegated to the [[SparkConnectStreamHandler]]. All 
error handling
+   * should be directly implemented in the deferred implementation. But this 
method catches
+   * generic errors.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def executePlan(request: Request, responseObserver: 
StreamObserver[Response]): Unit = {
+try {
+  new SparkConnectStreamHandler(responseObserver).handle(request)
+} catch {
+  case e: Throwable =>
+log.error("Error executing plan.", e)
+responseObserver.onError(
+  
Status.UNKNOWN.withCause(e).withDescription(e.getLocalizedMessage).asRuntimeException())
+}
+  }
+
+  /**
+   * Analyze a plan provide metadata and debugging information.
+   *
+   * This method is called to generate the explain plan for a SparkConnect 
plan. In its simplest
+   * implementation, the plan that is generated by the [[SparkConnectPlanner]] 
is used to build a
+   * [[Dataset]] and derive the explain string from the query execution 
details.
+   *
+   * Errors during planning are returned via the [[StreamObserver]] interface.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def analyzePlan(
+  request: Request,
+  responseObserver: StreamObserver[AnalyzeResponse]): Unit = {
+try {
+  val session =
+
SparkConnectService.getOrCreateIsolatedSession(request.getUserContext.getUserId).session
+
+  val logicalPlan = request.getPlan.getOpTypeCase match {
+case proto.Plan.OpTypeCase.ROOT =>
+  new SparkConnectPlanner(request.getPlan.getRoot, session).transform()
+case _ =>
+  responseObserver.onError(
+new UnsupportedOperationException(
+  s"${request.getPlan.getOpTypeCase} not supported for analysis."))
+  return
+  }
+  val ds = Dataset.ofRows(session, logicalPlan)
+  val explainString = ds.queryExecution.explainString(ExtendedMode)
+
+  val resp = proto.AnalyzeResponse
+.newBuilder()
+.setExplainString(explainString)
+.setClientId(request.getClientId)
+
+  resp.addAllColumnTypes(ds.schema.fields.map(_.dataType.sql).toSeq.asJava)
+  resp.addAllColumnNames(ds.schema.fields.map(_.name).toSeq.asJava)
+  responseObserver.onNext(resp.build())
+  responseObserver.onCompleted()
+} catch {
+  case e: Throwable =>
+log.error("Error analyzing plan.", e)
+

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978405726


##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Since, Unstable}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService implementation.
+ *
+ * This class implements the service stub from the generated code of GRPC.
+ *
+ * @param debug
+ *   delegates debug behavior to the handlers.
+ */
+@Unstable
+@Since("3.4.0")
+class SparkConnectService(debug: Boolean)
+extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
+with Logging {
+
+  /**
+   * This is the main entry method for Spark Connect and all calls to execute 
a plan.
+   *
+   * The plan execution is delegated to the [[SparkConnectStreamHandler]]. All 
error handling
+   * should be directly implemented in the deferred implementation. But this 
method catches
+   * generic errors.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def executePlan(request: Request, responseObserver: 
StreamObserver[Response]): Unit = {
+try {
+  new SparkConnectStreamHandler(responseObserver).handle(request)
+} catch {
+  case e: Throwable =>
+log.error("Error executing plan.", e)
+responseObserver.onError(
+  
Status.UNKNOWN.withCause(e).withDescription(e.getLocalizedMessage).asRuntimeException())
+}
+  }
+
+  /**
+   * Analyze a plan provide metadata and debugging information.
+   *
+   * This method is called to generate the explain plan for a SparkConnect 
plan. In its simplest
+   * implementation, the plan that is generated by the [[SparkConnectPlanner]] 
is used to build a
+   * [[Dataset]] and derive the explain string from the query execution 
details.
+   *
+   * Errors during planning are returned via the [[StreamObserver]] interface.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def analyzePlan(
+  request: Request,
+  responseObserver: StreamObserver[AnalyzeResponse]): Unit = {
+try {
+  val session =
+
SparkConnectService.getOrCreateIsolatedSession(request.getUserContext.getUserId).session
+
+  val logicalPlan = request.getPlan.getOpTypeCase match {
+case proto.Plan.OpTypeCase.ROOT =>
+  new SparkConnectPlanner(request.getPlan.getRoot, session).transform()
+case _ =>
+  responseObserver.onError(
+new UnsupportedOperationException(
+  s"${request.getPlan.getOpTypeCase} not supported for analysis."))
+  return
+  }
+  val ds = Dataset.ofRows(session, logicalPlan)
+  val explainString = ds.queryExecution.explainString(ExtendedMode)
+
+  val resp = proto.AnalyzeResponse
+.newBuilder()
+.setExplainString(explainString)
+.setClientId(request.getClientId)
+
+  resp.addAllColumnTypes(ds.schema.fields.map(_.dataType.sql).toSeq.asJava)
+  resp.addAllColumnNames(ds.schema.fields.map(_.name).toSeq.asJava)
+  responseObserver.onNext(resp.build())
+  responseObserver.onCompleted()
+} catch {
+  case e: Throwable =>
+log.error("Error analyzing plan.", e)
+

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978405412


##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Since, Unstable}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService implementation.
+ *
+ * This class implements the service stub from the generated code of GRPC.
+ *
+ * @param debug
+ *   delegates debug behavior to the handlers.
+ */
+@Unstable
+@Since("3.4.0")
+class SparkConnectService(debug: Boolean)
+extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
+with Logging {
+
+  /**
+   * This is the main entry method for Spark Connect and all calls to execute 
a plan.
+   *
+   * The plan execution is delegated to the [[SparkConnectStreamHandler]]. All 
error handling
+   * should be directly implemented in the deferred implementation. But this 
method catches
+   * generic errors.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def executePlan(request: Request, responseObserver: 
StreamObserver[Response]): Unit = {
+try {
+  new SparkConnectStreamHandler(responseObserver).handle(request)
+} catch {
+  case e: Throwable =>
+log.error("Error executing plan.", e)
+responseObserver.onError(
+  
Status.UNKNOWN.withCause(e).withDescription(e.getLocalizedMessage).asRuntimeException())
+}
+  }
+
+  /**
+   * Analyze a plan provide metadata and debugging information.
+   *
+   * This method is called to generate the explain plan for a SparkConnect 
plan. In its simplest
+   * implementation, the plan that is generated by the [[SparkConnectPlanner]] 
is used to build a
+   * [[Dataset]] and derive the explain string from the query execution 
details.
+   *
+   * Errors during planning are returned via the [[StreamObserver]] interface.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def analyzePlan(
+  request: Request,
+  responseObserver: StreamObserver[AnalyzeResponse]): Unit = {
+try {
+  val session =
+
SparkConnectService.getOrCreateIsolatedSession(request.getUserContext.getUserId).session
+
+  val logicalPlan = request.getPlan.getOpTypeCase match {
+case proto.Plan.OpTypeCase.ROOT =>
+  new SparkConnectPlanner(request.getPlan.getRoot, session).transform()
+case _ =>
+  responseObserver.onError(
+new UnsupportedOperationException(
+  s"${request.getPlan.getOpTypeCase} not supported for analysis."))
+  return
+  }
+  val ds = Dataset.ofRows(session, logicalPlan)
+  val explainString = ds.queryExecution.explainString(ExtendedMode)
+
+  val resp = proto.AnalyzeResponse
+.newBuilder()
+.setExplainString(explainString)
+.setClientId(request.getClientId)
+
+  resp.addAllColumnTypes(ds.schema.fields.map(_.dataType.sql).toSeq.asJava)
+  resp.addAllColumnNames(ds.schema.fields.map(_.name).toSeq.asJava)
+  responseObserver.onNext(resp.build())
+  responseObserver.onCompleted()
+} catch {
+  case e: Throwable =>
+log.error("Error analyzing plan.", e)
+

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978404522


##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Since, Unstable}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService implementation.
+ *
+ * This class implements the service stub from the generated code of GRPC.
+ *
+ * @param debug
+ *   delegates debug behavior to the handlers.
+ */
+@Unstable
+@Since("3.4.0")
+class SparkConnectService(debug: Boolean)
+extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
+with Logging {
+
+  /**
+   * This is the main entry method for Spark Connect and all calls to execute 
a plan.
+   *
+   * The plan execution is delegated to the [[SparkConnectStreamHandler]]. All 
error handling
+   * should be directly implemented in the deferred implementation. But this 
method catches
+   * generic errors.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def executePlan(request: Request, responseObserver: 
StreamObserver[Response]): Unit = {
+try {
+  new SparkConnectStreamHandler(responseObserver).handle(request)
+} catch {
+  case e: Throwable =>
+log.error("Error executing plan.", e)
+responseObserver.onError(
+  
Status.UNKNOWN.withCause(e).withDescription(e.getLocalizedMessage).asRuntimeException())
+}
+  }
+
+  /**
+   * Analyze a plan provide metadata and debugging information.

Review Comment:
   Done.



##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Since, Unstable}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978404056


##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Since, Unstable}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService implementation.
+ *
+ * This class implements the service stub from the generated code of GRPC.
+ *
+ * @param debug
+ *   delegates debug behavior to the handlers.
+ */
+@Unstable
+@Since("3.4.0")
+class SparkConnectService(debug: Boolean)
+extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
+with Logging {

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978401192


##
connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.{Since, Unstable}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{expressions, plans}
+import org.apache.spark.sql.catalyst.analysis.{
+  UnresolvedAlias,
+  UnresolvedAttribute,
+  UnresolvedFunction,
+  UnresolvedRelation,
+  UnresolvedStar
+}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.types.{
+  BinaryType,
+  ByteType,
+  DateType,
+  DoubleType,
+  FloatType,
+  IntegerType,
+  ShortType,
+  TimestampType
+}
+
+final case class InvalidPlanInput(
+private val message: String = "",
+private val cause: Throwable = None.orNull)
+extends Exception(message, cause)
+
+@Unstable
+@Since("3.4.0")
+class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) {
+
+  def transform(): LogicalPlan = {
+transformRelation(plan)
+  }
+
+  // The root of the query plan is a relation and we apply the transformations 
to it.
+  private def transformRelation(rel: proto.Relation): LogicalPlan = {
+val common = if (rel.hasCommon) {
+  Some(rel.getCommon)
+} else {
+  None
+}
+
+rel.getRelTypeCase match {
+  case proto.Relation.RelTypeCase.READ => transformReadRel(rel.getRead, 
common)
+  case proto.Relation.RelTypeCase.PROJECT => 
transformProject(rel.getProject, common)
+  case proto.Relation.RelTypeCase.FILTER => transformFilter(rel.getFilter)
+  case proto.Relation.RelTypeCase.FETCH => transformFetch(rel.getFetch)
+  case proto.Relation.RelTypeCase.JOIN => transformJoin(rel.getJoin)
+  case proto.Relation.RelTypeCase.UNION => transformUnion(rel.getUnion)
+  case proto.Relation.RelTypeCase.SORT => transformSort(rel.getSort)
+  case proto.Relation.RelTypeCase.AGGREGATE => 
transformAggregate(rel.getAggregate)
+  case proto.Relation.RelTypeCase.SQL => transformSql(rel.getSql)
+  case proto.Relation.RelTypeCase.RELTYPE_NOT_SET =>
+throw new IndexOutOfBoundsException("Expected Relation to be set, but 
is empty.")
+  case _ => throw InvalidPlanInput(s"${rel.getUnknown} not supported.")
+}
+  }
+
+  private def transformSql(sql: proto.SQL): LogicalPlan = {
+session.sessionState.sqlParser.parsePlan(sql.getQuery)
+  }
+
+  private def transformReadRel(
+  rel: proto.Read,
+  common: Option[proto.RelationCommon]): LogicalPlan = {
+val baseRelation = rel.getReadTypeCase match {
+  case proto.Read.ReadTypeCase.NAMED_TABLE =>
+val child = 
UnresolvedRelation(rel.getNamedTable.getPartsList.asScala.toSeq)
+if (common.nonEmpty && common.get.getAlias.nonEmpty) {
+  SubqueryAlias(identifier = common.get.getAlias, child = child)
+} else {
+  child
+}
+  case _ => throw InvalidPlanInput()
+}
+baseRelation
+  }
+
+  private def transformFilter(rel: proto.Filter): LogicalPlan = {
+assert(rel.hasInput)
+val baseRel = transformRelation(rel.getInput)
+logical.Filter(condition = transformExpression(rel.getCondition), child = 
baseRel)
+  }
+
+  private def transformProject(
+  rel: proto.Project,
+  common: Option[proto.RelationCommon]): LogicalPlan = {
+val baseRel = transformRelation(rel.getInput)
+val projection = if (rel.getExpressionsCount == 0) {
+  Seq(UnresolvedStar(Option.empty))
+} else {
+  
rel.getExpressionsList.asScala.map(transformExpression).map(UnresolvedAlias(_))
+}
+val project = logical.Project(projectList = projection.toSeq, child = 
baseRel)
+if (common.nonEmpty && common.get.getAlias.nonEmpty) {
+  logical.SubqueryAlias(identifier = common.get.getAlias, child = 

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978400858


##
connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.{Since, Unstable}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{expressions, plans}
+import org.apache.spark.sql.catalyst.analysis.{
+  UnresolvedAlias,
+  UnresolvedAttribute,
+  UnresolvedFunction,
+  UnresolvedRelation,
+  UnresolvedStar
+}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.types.{
+  BinaryType,
+  ByteType,
+  DateType,
+  DoubleType,
+  FloatType,
+  IntegerType,
+  ShortType,
+  TimestampType
+}
+
+final case class InvalidPlanInput(
+private val message: String = "",
+private val cause: Throwable = None.orNull)
+extends Exception(message, cause)
+
+@Unstable
+@Since("3.4.0")
+class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) {
+
+  def transform(): LogicalPlan = {
+transformRelation(plan)
+  }
+
+  // The root of the query plan is a relation and we apply the transformations 
to it.
+  private def transformRelation(rel: proto.Relation): LogicalPlan = {
+val common = if (rel.hasCommon) {
+  Some(rel.getCommon)
+} else {
+  None
+}
+
+rel.getRelTypeCase match {
+  case proto.Relation.RelTypeCase.READ => transformReadRel(rel.getRead, 
common)
+  case proto.Relation.RelTypeCase.PROJECT => 
transformProject(rel.getProject, common)
+  case proto.Relation.RelTypeCase.FILTER => transformFilter(rel.getFilter)
+  case proto.Relation.RelTypeCase.FETCH => transformFetch(rel.getFetch)
+  case proto.Relation.RelTypeCase.JOIN => transformJoin(rel.getJoin)
+  case proto.Relation.RelTypeCase.UNION => transformUnion(rel.getUnion)
+  case proto.Relation.RelTypeCase.SORT => transformSort(rel.getSort)
+  case proto.Relation.RelTypeCase.AGGREGATE => 
transformAggregate(rel.getAggregate)
+  case proto.Relation.RelTypeCase.SQL => transformSql(rel.getSql)
+  case proto.Relation.RelTypeCase.RELTYPE_NOT_SET =>
+throw new IndexOutOfBoundsException("Expected Relation to be set, but 
is empty.")
+  case _ => throw InvalidPlanInput(s"${rel.getUnknown} not supported.")
+}
+  }
+
+  private def transformSql(sql: proto.SQL): LogicalPlan = {
+session.sessionState.sqlParser.parsePlan(sql.getQuery)
+  }
+
+  private def transformReadRel(
+  rel: proto.Read,
+  common: Option[proto.RelationCommon]): LogicalPlan = {
+val baseRelation = rel.getReadTypeCase match {
+  case proto.Read.ReadTypeCase.NAMED_TABLE =>
+val child = 
UnresolvedRelation(rel.getNamedTable.getPartsList.asScala.toSeq)
+if (common.nonEmpty && common.get.getAlias.nonEmpty) {
+  SubqueryAlias(identifier = common.get.getAlias, child = child)
+} else {
+  child
+}
+  case _ => throw InvalidPlanInput()
+}
+baseRelation
+  }
+
+  private def transformFilter(rel: proto.Filter): LogicalPlan = {
+assert(rel.hasInput)
+val baseRel = transformRelation(rel.getInput)
+logical.Filter(condition = transformExpression(rel.getCondition), child = 
baseRel)
+  }
+
+  private def transformProject(
+  rel: proto.Project,
+  common: Option[proto.RelationCommon]): LogicalPlan = {
+val baseRel = transformRelation(rel.getInput)
+val projection = if (rel.getExpressionsCount == 0) {
+  Seq(UnresolvedStar(Option.empty))
+} else {
+  
rel.getExpressionsList.asScala.map(transformExpression).map(UnresolvedAlias(_))
+}
+val project = logical.Project(projectList = projection.toSeq, child = 
baseRel)
+if (common.nonEmpty && common.get.getAlias.nonEmpty) {
+  logical.SubqueryAlias(identifier = common.get.getAlias, child = 

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978364360


##
connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.{Since, Unstable}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{expressions, plans}
+import org.apache.spark.sql.catalyst.analysis.{
+  UnresolvedAlias,
+  UnresolvedAttribute,
+  UnresolvedFunction,
+  UnresolvedRelation,
+  UnresolvedStar
+}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.types.{
+  BinaryType,
+  ByteType,
+  DateType,
+  DoubleType,
+  FloatType,
+  IntegerType,
+  ShortType,
+  TimestampType
+}
+
+final case class InvalidPlanInput(
+private val message: String = "",
+private val cause: Throwable = None.orNull)
+extends Exception(message, cause)
+
+@Unstable
+@Since("3.4.0")
+class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) {
+
+  def transform(): LogicalPlan = {
+transformRelation(plan)
+  }
+
+  // The root of the query plan is a relation and we apply the transformations 
to it.
+  private def transformRelation(rel: proto.Relation): LogicalPlan = {
+val common = if (rel.hasCommon) {
+  Some(rel.getCommon)
+} else {
+  None
+}
+
+rel.getRelTypeCase match {
+  case proto.Relation.RelTypeCase.READ => transformReadRel(rel.getRead, 
common)
+  case proto.Relation.RelTypeCase.PROJECT => 
transformProject(rel.getProject, common)
+  case proto.Relation.RelTypeCase.FILTER => transformFilter(rel.getFilter)
+  case proto.Relation.RelTypeCase.FETCH => transformFetch(rel.getFetch)
+  case proto.Relation.RelTypeCase.JOIN => transformJoin(rel.getJoin)
+  case proto.Relation.RelTypeCase.UNION => transformUnion(rel.getUnion)
+  case proto.Relation.RelTypeCase.SORT => transformSort(rel.getSort)
+  case proto.Relation.RelTypeCase.AGGREGATE => 
transformAggregate(rel.getAggregate)
+  case proto.Relation.RelTypeCase.SQL => transformSql(rel.getSql)
+  case proto.Relation.RelTypeCase.RELTYPE_NOT_SET =>
+throw new IndexOutOfBoundsException("Expected Relation to be set, but 
is empty.")
+  case _ => throw InvalidPlanInput(s"${rel.getUnknown} not supported.")
+}
+  }
+
+  private def transformSql(sql: proto.SQL): LogicalPlan = {
+session.sessionState.sqlParser.parsePlan(sql.getQuery)
+  }
+
+  private def transformReadRel(
+  rel: proto.Read,
+  common: Option[proto.RelationCommon]): LogicalPlan = {
+val baseRelation = rel.getReadTypeCase match {
+  case proto.Read.ReadTypeCase.NAMED_TABLE =>
+val child = 
UnresolvedRelation(rel.getNamedTable.getPartsList.asScala.toSeq)
+if (common.nonEmpty && common.get.getAlias.nonEmpty) {
+  SubqueryAlias(identifier = common.get.getAlias, child = child)
+} else {
+  child
+}
+  case _ => throw InvalidPlanInput()
+}
+baseRelation
+  }
+
+  private def transformFilter(rel: proto.Filter): LogicalPlan = {
+assert(rel.hasInput)
+val baseRel = transformRelation(rel.getInput)
+logical.Filter(condition = transformExpression(rel.getCondition), child = 
baseRel)
+  }
+
+  private def transformProject(
+  rel: proto.Project,
+  common: Option[proto.RelationCommon]): LogicalPlan = {
+val baseRel = transformRelation(rel.getInput)
+val projection = if (rel.getExpressionsCount == 0) {
+  Seq(UnresolvedStar(Option.empty))
+} else {
+  
rel.getExpressionsList.asScala.map(transformExpression).map(UnresolvedAlias(_))
+}
+val project = logical.Project(projectList = projection.toSeq, child = 
baseRel)
+if (common.nonEmpty && common.get.getAlias.nonEmpty) {
+  logical.SubqueryAlias(identifier = common.get.getAlias, child = 

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978363759


##
connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.{Since, Unstable}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{expressions, plans}
+import org.apache.spark.sql.catalyst.analysis.{
+  UnresolvedAlias,
+  UnresolvedAttribute,
+  UnresolvedFunction,
+  UnresolvedRelation,
+  UnresolvedStar
+}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.types.{
+  BinaryType,
+  ByteType,
+  DateType,
+  DoubleType,
+  FloatType,
+  IntegerType,
+  ShortType,
+  TimestampType
+}
+
+final case class InvalidPlanInput(
+private val message: String = "",
+private val cause: Throwable = None.orNull)
+extends Exception(message, cause)
+
+@Unstable
+@Since("3.4.0")
+class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) {
+
+  def transform(): LogicalPlan = {
+transformRelation(plan)
+  }
+
+  // The root of the query plan is a relation and we apply the transformations 
to it.
+  private def transformRelation(rel: proto.Relation): LogicalPlan = {
+val common = if (rel.hasCommon) {
+  Some(rel.getCommon)
+} else {
+  None
+}
+
+rel.getRelTypeCase match {
+  case proto.Relation.RelTypeCase.READ => transformReadRel(rel.getRead, 
common)
+  case proto.Relation.RelTypeCase.PROJECT => 
transformProject(rel.getProject, common)
+  case proto.Relation.RelTypeCase.FILTER => transformFilter(rel.getFilter)
+  case proto.Relation.RelTypeCase.FETCH => transformFetch(rel.getFetch)
+  case proto.Relation.RelTypeCase.JOIN => transformJoin(rel.getJoin)
+  case proto.Relation.RelTypeCase.UNION => transformUnion(rel.getUnion)
+  case proto.Relation.RelTypeCase.SORT => transformSort(rel.getSort)
+  case proto.Relation.RelTypeCase.AGGREGATE => 
transformAggregate(rel.getAggregate)
+  case proto.Relation.RelTypeCase.SQL => transformSql(rel.getSql)
+  case proto.Relation.RelTypeCase.RELTYPE_NOT_SET =>
+throw new IndexOutOfBoundsException("Expected Relation to be set, but 
is empty.")
+  case _ => throw InvalidPlanInput(s"${rel.getUnknown} not supported.")
+}
+  }
+
+  private def transformSql(sql: proto.SQL): LogicalPlan = {
+session.sessionState.sqlParser.parsePlan(sql.getQuery)
+  }
+
+  private def transformReadRel(
+  rel: proto.Read,
+  common: Option[proto.RelationCommon]): LogicalPlan = {
+val baseRelation = rel.getReadTypeCase match {
+  case proto.Read.ReadTypeCase.NAMED_TABLE =>
+val child = 
UnresolvedRelation(rel.getNamedTable.getPartsList.asScala.toSeq)
+if (common.nonEmpty && common.get.getAlias.nonEmpty) {
+  SubqueryAlias(identifier = common.get.getAlias, child = child)
+} else {
+  child
+}
+  case _ => throw InvalidPlanInput()
+}
+baseRelation
+  }
+
+  private def transformFilter(rel: proto.Filter): LogicalPlan = {
+assert(rel.hasInput)
+val baseRel = transformRelation(rel.getInput)
+logical.Filter(condition = transformExpression(rel.getCondition), child = 
baseRel)
+  }
+
+  private def transformProject(
+  rel: proto.Project,
+  common: Option[proto.RelationCommon]): LogicalPlan = {
+val baseRel = transformRelation(rel.getInput)
+val projection = if (rel.getExpressionsCount == 0) {
+  Seq(UnresolvedStar(Option.empty))
+} else {
+  
rel.getExpressionsList.asScala.map(transformExpression).map(UnresolvedAlias(_))
+}
+val project = logical.Project(projectList = projection.toSeq, child = 
baseRel)
+if (common.nonEmpty && common.get.getAlias.nonEmpty) {
+  logical.SubqueryAlias(identifier = common.get.getAlias, child = 

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978361286


##
connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.{Since, Unstable}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{expressions, plans}
+import org.apache.spark.sql.catalyst.analysis.{
+  UnresolvedAlias,
+  UnresolvedAttribute,
+  UnresolvedFunction,
+  UnresolvedRelation,
+  UnresolvedStar
+}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.types.{
+  BinaryType,
+  ByteType,
+  DateType,
+  DoubleType,
+  FloatType,
+  IntegerType,
+  ShortType,
+  TimestampType
+}
+
+final case class InvalidPlanInput(
+private val message: String = "",
+private val cause: Throwable = None.orNull)
+extends Exception(message, cause)
+
+@Unstable
+@Since("3.4.0")
+class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) {
+
+  def transform(): LogicalPlan = {
+transformRelation(plan)
+  }
+
+  // The root of the query plan is a relation and we apply the transformations 
to it.
+  private def transformRelation(rel: proto.Relation): LogicalPlan = {
+val common = if (rel.hasCommon) {
+  Some(rel.getCommon)
+} else {
+  None
+}
+
+rel.getRelTypeCase match {
+  case proto.Relation.RelTypeCase.READ => transformReadRel(rel.getRead, 
common)
+  case proto.Relation.RelTypeCase.PROJECT => 
transformProject(rel.getProject, common)
+  case proto.Relation.RelTypeCase.FILTER => transformFilter(rel.getFilter)
+  case proto.Relation.RelTypeCase.FETCH => transformFetch(rel.getFetch)
+  case proto.Relation.RelTypeCase.JOIN => transformJoin(rel.getJoin)
+  case proto.Relation.RelTypeCase.UNION => transformUnion(rel.getUnion)
+  case proto.Relation.RelTypeCase.SORT => transformSort(rel.getSort)
+  case proto.Relation.RelTypeCase.AGGREGATE => 
transformAggregate(rel.getAggregate)
+  case proto.Relation.RelTypeCase.SQL => transformSql(rel.getSql)
+  case proto.Relation.RelTypeCase.RELTYPE_NOT_SET =>
+throw new IndexOutOfBoundsException("Expected Relation to be set, but 
is empty.")
+  case _ => throw InvalidPlanInput(s"${rel.getUnknown} not supported.")
+}
+  }
+
+  private def transformSql(sql: proto.SQL): LogicalPlan = {
+session.sessionState.sqlParser.parsePlan(sql.getQuery)
+  }
+
+  private def transformReadRel(
+  rel: proto.Read,
+  common: Option[proto.RelationCommon]): LogicalPlan = {
+val baseRelation = rel.getReadTypeCase match {
+  case proto.Read.ReadTypeCase.NAMED_TABLE =>
+val child = 
UnresolvedRelation(rel.getNamedTable.getPartsList.asScala.toSeq)
+if (common.nonEmpty && common.get.getAlias.nonEmpty) {
+  SubqueryAlias(identifier = common.get.getAlias, child = child)
+} else {
+  child
+}
+  case _ => throw InvalidPlanInput()
+}
+baseRelation
+  }
+
+  private def transformFilter(rel: proto.Filter): LogicalPlan = {
+assert(rel.hasInput)
+val baseRel = transformRelation(rel.getInput)
+logical.Filter(condition = transformExpression(rel.getCondition), child = 
baseRel)
+  }
+
+  private def transformProject(
+  rel: proto.Project,
+  common: Option[proto.RelationCommon]): LogicalPlan = {
+val baseRel = transformRelation(rel.getInput)
+val projection = if (rel.getExpressionsCount == 0) {
+  Seq(UnresolvedStar(Option.empty))
+} else {
+  
rel.getExpressionsList.asScala.map(transformExpression).map(UnresolvedAlias(_))
+}
+val project = logical.Project(projectList = projection.toSeq, child = 
baseRel)
+if (common.nonEmpty && common.get.getAlias.nonEmpty) {
+  logical.SubqueryAlias(identifier = common.get.getAlias, child = 

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978359822


##
connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.{Since, Unstable}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{expressions, plans}
+import org.apache.spark.sql.catalyst.analysis.{
+  UnresolvedAlias,
+  UnresolvedAttribute,
+  UnresolvedFunction,
+  UnresolvedRelation,
+  UnresolvedStar
+}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.types.{
+  BinaryType,
+  ByteType,
+  DateType,
+  DoubleType,
+  FloatType,
+  IntegerType,
+  ShortType,
+  TimestampType
+}
+
+final case class InvalidPlanInput(
+private val message: String = "",
+private val cause: Throwable = None.orNull)
+extends Exception(message, cause)
+
+@Unstable
+@Since("3.4.0")
+class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) {
+
+  def transform(): LogicalPlan = {
+transformRelation(plan)
+  }
+
+  // The root of the query plan is a relation and we apply the transformations 
to it.
+  private def transformRelation(rel: proto.Relation): LogicalPlan = {
+val common = if (rel.hasCommon) {
+  Some(rel.getCommon)
+} else {
+  None
+}
+
+rel.getRelTypeCase match {
+  case proto.Relation.RelTypeCase.READ => transformReadRel(rel.getRead, 
common)
+  case proto.Relation.RelTypeCase.PROJECT => 
transformProject(rel.getProject, common)
+  case proto.Relation.RelTypeCase.FILTER => transformFilter(rel.getFilter)
+  case proto.Relation.RelTypeCase.FETCH => transformFetch(rel.getFetch)
+  case proto.Relation.RelTypeCase.JOIN => transformJoin(rel.getJoin)
+  case proto.Relation.RelTypeCase.UNION => transformUnion(rel.getUnion)
+  case proto.Relation.RelTypeCase.SORT => transformSort(rel.getSort)
+  case proto.Relation.RelTypeCase.AGGREGATE => 
transformAggregate(rel.getAggregate)
+  case proto.Relation.RelTypeCase.SQL => transformSql(rel.getSql)
+  case proto.Relation.RelTypeCase.RELTYPE_NOT_SET =>
+throw new IndexOutOfBoundsException("Expected Relation to be set, but 
is empty.")
+  case _ => throw InvalidPlanInput(s"${rel.getUnknown} not supported.")
+}
+  }
+
+  private def transformSql(sql: proto.SQL): LogicalPlan = {
+session.sessionState.sqlParser.parsePlan(sql.getQuery)
+  }
+
+  private def transformReadRel(
+  rel: proto.Read,
+  common: Option[proto.RelationCommon]): LogicalPlan = {
+val baseRelation = rel.getReadTypeCase match {
+  case proto.Read.ReadTypeCase.NAMED_TABLE =>
+val child = 
UnresolvedRelation(rel.getNamedTable.getPartsList.asScala.toSeq)
+if (common.nonEmpty && common.get.getAlias.nonEmpty) {
+  SubqueryAlias(identifier = common.get.getAlias, child = child)
+} else {
+  child
+}
+  case _ => throw InvalidPlanInput()
+}
+baseRelation
+  }
+
+  private def transformFilter(rel: proto.Filter): LogicalPlan = {
+assert(rel.hasInput)
+val baseRel = transformRelation(rel.getInput)
+logical.Filter(condition = transformExpression(rel.getCondition), child = 
baseRel)
+  }
+
+  private def transformProject(
+  rel: proto.Project,
+  common: Option[proto.RelationCommon]): LogicalPlan = {
+val baseRel = transformRelation(rel.getInput)
+val projection = if (rel.getExpressionsCount == 0) {
+  Seq(UnresolvedStar(Option.empty))
+} else {
+  
rel.getExpressionsList.asScala.map(transformExpression).map(UnresolvedAlias(_))
+}
+val project = logical.Project(projectList = projection.toSeq, child = 
baseRel)
+if (common.nonEmpty && common.get.getAlias.nonEmpty) {
+  logical.SubqueryAlias(identifier = common.get.getAlias, child = 

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978358455


##
connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.{Since, Unstable}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{expressions, plans}
+import org.apache.spark.sql.catalyst.analysis.{
+  UnresolvedAlias,
+  UnresolvedAttribute,
+  UnresolvedFunction,
+  UnresolvedRelation,
+  UnresolvedStar
+}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.types.{
+  BinaryType,
+  ByteType,
+  DateType,
+  DoubleType,
+  FloatType,
+  IntegerType,
+  ShortType,
+  TimestampType
+}
+
+final case class InvalidPlanInput(
+private val message: String = "",
+private val cause: Throwable = None.orNull)
+extends Exception(message, cause)
+
+@Unstable
+@Since("3.4.0")
+class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) {
+
+  def transform(): LogicalPlan = {
+transformRelation(plan)
+  }
+
+  // The root of the query plan is a relation and we apply the transformations 
to it.
+  private def transformRelation(rel: proto.Relation): LogicalPlan = {
+val common = if (rel.hasCommon) {
+  Some(rel.getCommon)
+} else {
+  None
+}
+
+rel.getRelTypeCase match {
+  case proto.Relation.RelTypeCase.READ => transformReadRel(rel.getRead, 
common)
+  case proto.Relation.RelTypeCase.PROJECT => 
transformProject(rel.getProject, common)
+  case proto.Relation.RelTypeCase.FILTER => transformFilter(rel.getFilter)
+  case proto.Relation.RelTypeCase.FETCH => transformFetch(rel.getFetch)
+  case proto.Relation.RelTypeCase.JOIN => transformJoin(rel.getJoin)
+  case proto.Relation.RelTypeCase.UNION => transformUnion(rel.getUnion)
+  case proto.Relation.RelTypeCase.SORT => transformSort(rel.getSort)
+  case proto.Relation.RelTypeCase.AGGREGATE => 
transformAggregate(rel.getAggregate)
+  case proto.Relation.RelTypeCase.SQL => transformSql(rel.getSql)
+  case proto.Relation.RelTypeCase.RELTYPE_NOT_SET =>
+throw new IndexOutOfBoundsException("Expected Relation to be set, but 
is empty.")
+  case _ => throw InvalidPlanInput(s"${rel.getUnknown} not supported.")
+}
+  }
+
+  private def transformSql(sql: proto.SQL): LogicalPlan = {
+session.sessionState.sqlParser.parsePlan(sql.getQuery)
+  }
+
+  private def transformReadRel(
+  rel: proto.Read,
+  common: Option[proto.RelationCommon]): LogicalPlan = {
+val baseRelation = rel.getReadTypeCase match {
+  case proto.Read.ReadTypeCase.NAMED_TABLE =>
+val child = 
UnresolvedRelation(rel.getNamedTable.getPartsList.asScala.toSeq)
+if (common.nonEmpty && common.get.getAlias.nonEmpty) {
+  SubqueryAlias(identifier = common.get.getAlias, child = child)
+} else {
+  child
+}
+  case _ => throw InvalidPlanInput()
+}
+baseRelation
+  }
+
+  private def transformFilter(rel: proto.Filter): LogicalPlan = {
+assert(rel.hasInput)
+val baseRel = transformRelation(rel.getInput)
+logical.Filter(condition = transformExpression(rel.getCondition), child = 
baseRel)
+  }
+
+  private def transformProject(
+  rel: proto.Project,
+  common: Option[proto.RelationCommon]): LogicalPlan = {
+val baseRel = transformRelation(rel.getInput)
+val projection = if (rel.getExpressionsCount == 0) {
+  Seq(UnresolvedStar(Option.empty))
+} else {
+  
rel.getExpressionsList.asScala.map(transformExpression).map(UnresolvedAlias(_))
+}
+val project = logical.Project(projectList = projection.toSeq, child = 
baseRel)
+if (common.nonEmpty && common.get.getAlias.nonEmpty) {
+  logical.SubqueryAlias(identifier = common.get.getAlias, child = 

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978358109


##
connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.command
+
+import scala.collection.JavaConverters._
+
+import com.google.common.collect.{Lists, Maps}
+
+import org.apache.spark.annotation.{Since, Unstable}
+import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
+import org.apache.spark.sql.types.StringType
+
+@Unstable
+@Since("3.4.0")
+class SparkConnectCommandPlanner(session: SparkSession, command: 
proto.Command) {
+
+  lazy val pythonVersion =
+sys.env.getOrElse("PYSPARK_PYTHON", 
sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python3"))
+
+  def process(): Unit = {
+command.getCommandTypeCase match {
+  case proto.Command.CommandTypeCase.CREATE_FUNCTION =>
+handleCreateScalarFunction(command.getCreateFunction)
+  case _ => throw new UnsupportedOperationException(s"${command} not 
supported.")

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978357079


##
connect/src/main/protobuf/spark/connect/commands.proto:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = 'proto3';
+
+import "spark/connect/types.proto";
+
+package spark.connect;
+
+option java_multiple_files = true;
+option java_package = "org.apache.spark.connect.proto";
+option go_package = "github.com/databricks/spark-connect/proto";
+
+// A [[Command]] is an operation that is executed by the server that does not 
directly consume or
+// produce a relational result.
+message Command {
+  oneof command_type {
+CreateScalarFunction create_function = 1;
+  }
+}
+
+// Simple message that is used to create a scalar function based on the 
provided function body.
+//
+// This message is used to register for example a Python UDF in the session 
catalog by providing
+// the serialized method body.
+message CreateScalarFunction {
+  // Fully qualified name of the function including the catalog / schema names.
+  repeated string parts = 1;
+  FunctionLanguage language = 2;
+  bool temporary = 3;
+  repeated Type argument_types = 4;
+  Type return_type = 5;
+
+  // How the function body is defined:
+  oneof function_definition {
+// As a raw string serialized:
+bytes serialized_function = 6;
+// As a code literal
+string literal_string = 7;
+  }
+
+  enum FunctionLanguage {
+FUNCTION_LANGUAGE_UNSPECIFIED = 0;
+FUNCTION_LANGUAGE_SQL = 1;
+FUNCTION_LANGUAGE_PYTHON = 2;

Review Comment:
   Yes, absolutely. The Python version is actually already needed 
[SPARK-40532](https://issues.apache.org/jira/browse/SPARK-40532) to reflect 
that and will leave a comment in the proto as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978352846


##
connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.command
+
+import scala.collection.JavaConverters._
+
+import com.google.common.collect.{Lists, Maps}
+
+import org.apache.spark.annotation.{Since, Unstable}
+import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
+import org.apache.spark.sql.types.StringType
+
+@Unstable
+@Since("3.4.0")
+class SparkConnectCommandPlanner(session: SparkSession, command: 
proto.Command) {
+
+  lazy val pythonVersion =
+sys.env.getOrElse("PYSPARK_PYTHON", 
sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python3"))

Review Comment:
   The way the UDFs are currently implemented is really very rudimentary and I 
will update a comment to reflect that. If the Python version diverge, the 
executor will throw an error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978351469


##
connect/src/main/protobuf/spark/connect/types.proto:
##
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = 'proto3';
+
+package spark.connect;
+
+option java_multiple_files = true;
+option java_package = "org.apache.spark.connect.proto";
+option go_package = "github.com/databricks/spark-connect/proto";
+
+/*
+ This message describes the logical [[Type]] of something. It does not carry 
the value
+ itself but only describes it.
+ */
+message Type {
+  oneof kind {
+Boolean bool = 1;
+I8 i8 = 2;
+I16 i16 = 3;
+I32 i32 = 5;
+I64 i64 = 7;
+FP32 fp32 = 10;
+FP64 fp64 = 11;
+String string = 12;
+Binary binary = 13;
+Timestamp timestamp = 14;
+Date date = 16;
+Time time = 17;
+IntervalYear interval_year = 19;
+IntervalDay interval_day = 20;
+TimestampTZ timestamp_tz = 29;
+UUID uuid = 32;
+
+FixedChar fixed_char = 21;
+VarChar varchar = 22;
+FixedBinary fixed_binary = 23;
+Decimal decimal = 24;
+
+Struct struct = 25;
+List list = 27;
+Map map = 28;
+
+uint32 user_defined_type_reference = 31;
+  }
+
+  enum Nullability {
+NULLABILITY_UNSPECIFIED = 0;
+NULLABILITY_NULLABLE = 1;
+NULLABILITY_REQUIRED = 2;
+  }
+
+  message Boolean {
+uint32 type_variation_reference = 1;
+Nullability nullability = 2;
+  }

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978350959


##
connect/src/main/protobuf/spark/connect/relations.proto:
##
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = 'proto3';
+
+package spark.connect;
+
+import "spark/connect/expressions.proto";
+
+option java_multiple_files = true;
+option java_package = "org.apache.spark.connect.proto";
+option go_package = "github.com/databricks/spark-connect/proto";
+
+/*
+ The main [[Relation]] type. Fundamentally, a relation is a typed container
+ that has exactly one explicit relation type set.
+
+ When adding new relation types, they have to be registered here.
+ */
+message Relation {
+  RelationCommon common = 1;
+  oneof rel_type {
+Read read = 2;
+Project project = 3;
+Filter filter = 4;
+Join join = 5;
+Union union = 6;
+Sort sort = 7;
+Fetch fetch = 8;
+Aggregate aggregate = 9;
+SQL sql = 10;
+
+Unknown unknown = 999;
+  }
+}
+
+/*
+ * Used for testing purposes only.

Review Comment:
   I homogenized the proto comment style across all files now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978348887


##
connect/src/main/protobuf/spark/connect/base.proto:
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = 'proto3';
+
+package spark.connect;
+
+import "spark/connect/commands.proto";
+import "spark/connect/relations.proto";
+
+option java_multiple_files = true;
+option java_package = "org.apache.spark.connect.proto";
+option go_package = "github.com/databricks/spark-connect/proto";

Review Comment:
   Removed all the go packages. They're not needed atm.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-23 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r978343068


##
connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.{Since, Unstable}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{expressions, plans}
+import org.apache.spark.sql.catalyst.analysis.{
+  UnresolvedAlias,
+  UnresolvedAttribute,
+  UnresolvedFunction,
+  UnresolvedRelation,
+  UnresolvedStar
+}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.types.{
+  BinaryType,
+  ByteType,
+  DateType,
+  DoubleType,
+  FloatType,
+  IntegerType,
+  ShortType,
+  TimestampType
+}

Review Comment:
   I'm happy to change this in any way you want but can you please make your 
comment a bit more actionable?
   
   - scalafmt produces an output that is validated in our style checks.
   - I couldn't find anything in the Spark style guide on this topic that would 
ban an import like this.
   
   Now my question is how do you want the output to look like and how do I 
produce the output automatically? It would be weird to request manual style 
adjustments that we don't verify in the build? (In particular since this is new 
code?)
   
   Thanks in advance!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977836144


##
connect/src/main/protobuf/spark/connect/relations.proto:
##
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = 'proto3';
+
+package spark.connect;
+
+import "spark/connect/expressions.proto";
+
+option java_multiple_files = true;
+option java_package = "org.apache.spark.connect.proto";
+option go_package = "github.com/databricks/spark-connect/proto";
+
+/*
+ The main [[Relation]] type. Fundamentally, a relation is a typed container
+ that has exactly one explicit relation type set.
+
+ When adding new relation types, they have to be registered here.
+ */
+message Relation {
+  RelationCommon common = 1;
+  oneof rel_type {
+Read read = 2;
+Project project = 3;
+Filter filter = 4;
+Join join = 5;
+Union union = 6;
+Sort sort = 7;
+Fetch fetch = 8;
+Aggregate aggregate = 9;
+Sql sql = 10;
+
+Unknown unknown = 999;
+  }
+}
+
+/*
+ * Used for testing purposes only.
+ */
+message Unknown {}
+
+/*
+ Common metadata of all relations.
+ */
+message RelationCommon {
+  string source_info = 1;
+  string alias = 2;
+}
+
+/*
+ Relation that uses a SQL query to generate the output.
+ */
+message Sql {

Review Comment:
   SQL



##
connect/src/main/protobuf/spark/connect/base.proto:
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = 'proto3';
+
+package spark.connect;
+
+import "spark/connect/commands.proto";
+import "spark/connect/relations.proto";
+
+option java_multiple_files = true;
+option java_package = "org.apache.spark.connect.proto";
+option go_package = "github.com/databricks/spark-connect/proto";
+
+
+// A [[Plan]] is the structure that carries the runtime information for the 
execution from the
+// client to the server. A [[Plan]] can either be of the type [[Relation]] 
which is a reference
+// to the underlying logical plan or it can be of the [[Command]] type that is 
used to execute
+// commands on the server.
+message Plan {
+  oneof op_type {
+Relation root = 1;
+Command command = 2;
+  }
+}
+
+// A request to be executed by the service.
+message Request {
+  // The client_id is set by the client to be able to collate streaming 
responses from
+  // different queries.
+  string client_id = 1;
+  // User context
+  UserContext user_context = 2;
+  // The logical plan to be executed / analyzed.
+  Plan plan = 3;
+
+  // User Context is used to refer to one particular user session that is 
executing
+  // queries in the backend.
+  message UserContext {
+string user_id = 1;
+string user_name = 2;
+  }
+}
+
+// The response of a query, can be one or more for each request. Responses 
belonging to the
+// same input query, carry the same `client_id`.
+message Response {
+  string client_id = 1;
+
+  // Result type
+  oneof result_type {
+ArrowBatch batch = 2;
+CSVBatch csv_batch = 3;
+  }
+
+  // Metrics for the query execution. Typically, this field is only present in 
the last
+  // batch of results and then represent the overall state of the query 
execution.
+  Metrics metrics = 4;
+
+  // Batch results of metrics.
+  message ArrowBatch {
+int64 row_count = 1;
+int64 uncompressed_bytes = 2;
+int64 compressed_bytes = 3;
+bytes data = 4;
+bytes schema = 5;
+  }
+
+  message CSVBatch {

Review Comment:
   Done

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977660209


##
python/pyspark/sql/connect/plan.py:
##
@@ -0,0 +1,468 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import (
+List,
+Optional,
+Sequence,
+Tuple,
+Union,
+cast,
+TYPE_CHECKING,
+)
+
+import pyspark.sql.connect.proto as proto
+from pyspark.sql.connect.column import (
+ColumnOrString,
+ColumnRef,
+Expression,
+ExpressionOrString,
+SortOrder,
+)
+
+
+if TYPE_CHECKING:
+from pyspark.sql.connect.client import RemoteSparkSession
+
+
+class InputValidationError(Exception):
+pass
+
+
+class LogicalPlan(object):
+
+INDENT = 2
+
+def __init__(self, child: Optional["LogicalPlan"]) -> None:
+self._child = child
+
+def unresolved_attr(self, *colNames: str) -> proto.Expression:
+"""Creates an unresolved attribute from a column name."""
+exp = proto.Expression()
+exp.unresolved_attribute.parts.extend(list(colNames))
+return exp
+
+def to_attr_or_expression(
+self, col: ColumnOrString, session: "RemoteSparkSession"
+) -> proto.Expression:
+"""Returns either an instance of an unresolved attribute or the 
serialized
+expression value of the column."""
+if type(col) is str:
+return self.unresolved_attr(cast(str, col))
+else:
+return cast(ColumnRef, col).to_plan(session)
+
+def plan(self, session: "RemoteSparkSession") -> proto.Relation:
+...
+
+def _verify(self, session: "RemoteSparkSession") -> bool:
+"""This method is used to verify that the current logical plan
+can be serialized to Proto and back and afterwards is identical."""
+plan = proto.Plan()
+plan.root.CopyFrom(self.plan(session))
+
+serialized_plan = plan.SerializeToString()
+test_plan = proto.Plan()
+test_plan.ParseFromString(serialized_plan)
+
+return test_plan == plan
+
+# TODO(martin.grund) explain , schema
+def collect(self, session: "RemoteSparkSession" = None, debug: bool = 
False):
+plan = proto.Plan()
+plan.root.CopyFrom(self.plan(session))
+
+if debug:
+print(plan)
+
+return plan
+
+def _i(self, indent) -> str:

Review Comment:
   Removed the function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977656498


##
python/pyspark/sql/connect/plan.py:
##
@@ -0,0 +1,468 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import (
+List,
+Optional,
+Sequence,
+Tuple,
+Union,
+cast,
+TYPE_CHECKING,
+)
+
+import pyspark.sql.connect.proto as proto
+from pyspark.sql.connect.column import (
+ColumnOrString,
+ColumnRef,
+Expression,
+ExpressionOrString,
+SortOrder,
+)
+
+
+if TYPE_CHECKING:
+from pyspark.sql.connect.client import RemoteSparkSession
+
+
+class InputValidationError(Exception):
+pass
+
+
+class LogicalPlan(object):
+
+INDENT = 2
+
+def __init__(self, child: Optional["LogicalPlan"]) -> None:
+self._child = child
+
+def unresolved_attr(self, *colNames: str) -> proto.Expression:
+"""Creates an unresolved attribute from a column name."""
+exp = proto.Expression()
+exp.unresolved_attribute.parts.extend(list(colNames))
+return exp
+
+def to_attr_or_expression(
+self, col: ColumnOrString, session: "RemoteSparkSession"
+) -> proto.Expression:
+"""Returns either an instance of an unresolved attribute or the 
serialized
+expression value of the column."""
+if type(col) is str:
+return self.unresolved_attr(cast(str, col))
+else:
+return cast(ColumnRef, col).to_plan(session)
+
+def plan(self, session: "RemoteSparkSession") -> proto.Relation:
+...
+
+def _verify(self, session: "RemoteSparkSession") -> bool:
+"""This method is used to verify that the current logical plan
+can be serialized to Proto and back and afterwards is identical."""
+plan = proto.Plan()
+plan.root.CopyFrom(self.plan(session))
+
+serialized_plan = plan.SerializeToString()
+test_plan = proto.Plan()
+test_plan.ParseFromString(serialized_plan)
+
+return test_plan == plan
+
+# TODO(martin.grund) explain , schema

Review Comment:
   Explain and Schema are already part of the MVP in the DataFrame.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977655361


##
python/pyspark/sql/connect/README.md:
##
@@ -0,0 +1,34 @@
+
+# [EXPERIMENTAL] Spark Connect
+
+**Spark Connect is a strictly experimental feature and under heavy development.
+All APIs should be considered volatile and should not be used in production.**
+
+This module contains the implementation of Spark Connect which is a logical 
plan
+facade for the implementation in Spark. Spark Connect is directly integrated 
into the build
+of Spark. To enable it, you only need to activate the driver plugin for Spark 
Connect.
+
+
+
+
+## Build
+
+1. Build Spark as usual per the documentation.
+2. Build and package the Spark Connect package
+   ```commandline

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977654821


##
python/pyspark/sql/connect/readwriter.py:
##
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyspark.sql.connect.data_frame import DataFrame
+from pyspark.sql.connect.plan import Read
+
+
+class DataFrameReader:
+def __init__(self, client):
+self._client = client
+
+def table(self, tableName: str) -> "DataFrame":
+df = DataFrame.withPlan(Read(tableName), self._client)
+return df

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977652216


##
python/pyspark/sql/connect/plan.py:
##
@@ -0,0 +1,468 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import (
+List,
+Optional,
+Sequence,
+Tuple,
+Union,
+cast,
+TYPE_CHECKING,
+)
+
+import pyspark.sql.connect.proto as proto
+from pyspark.sql.connect.column import (
+ColumnOrString,
+ColumnRef,
+Expression,
+ExpressionOrString,
+SortOrder,
+)
+
+
+if TYPE_CHECKING:
+from pyspark.sql.connect.client import RemoteSparkSession
+
+
+class InputValidationError(Exception):
+pass
+
+
+class LogicalPlan(object):
+
+INDENT = 2
+
+def __init__(self, child: Optional["LogicalPlan"]) -> None:
+self._child = child
+
+def unresolved_attr(self, *colNames: str) -> proto.Expression:
+"""Creates an unresolved attribute from a column name."""
+exp = proto.Expression()
+exp.unresolved_attribute.parts.extend(list(colNames))
+return exp
+
+def to_attr_or_expression(
+self, col: ColumnOrString, session: "RemoteSparkSession"
+) -> proto.Expression:
+"""Returns either an instance of an unresolved attribute or the 
serialized
+expression value of the column."""
+if type(col) is str:
+return self.unresolved_attr(cast(str, col))
+else:
+return cast(ColumnRef, col).to_plan(session)
+
+def plan(self, session: "RemoteSparkSession") -> proto.Relation:
+...
+
+def _verify(self, session: "RemoteSparkSession") -> bool:
+"""This method is used to verify that the current logical plan
+can be serialized to Proto and back and afterwards is identical."""
+plan = proto.Plan()
+plan.root.CopyFrom(self.plan(session))
+
+serialized_plan = plan.SerializeToString()
+test_plan = proto.Plan()
+test_plan.ParseFromString(serialized_plan)
+
+return test_plan == plan
+
+# TODO(martin.grund) explain , schema
+def collect(self, session: "RemoteSparkSession" = None, debug: bool = 
False):
+plan = proto.Plan()
+plan.root.CopyFrom(self.plan(session))
+
+if debug:
+print(plan)
+
+return plan
+
+def _i(self, indent) -> str:
+return " " * indent
+
+def print(self, indent=0) -> str:
+...
+
+def _repr_html_(self):
+...
+
+
+class Read(LogicalPlan):
+def __init__(self, table_name: str) -> None:
+super().__init__(None)
+self.table_name = table_name
+
+def plan(self, session: "RemoteSparkSession") -> proto.Relation:
+plan = proto.Relation()
+plan.read.named_table.parts.extend(self.table_name.split("."))
+return plan
+
+def print(self, indent=0) -> str:
+return f"{self._i(indent)}\n"
+
+def _repr_html_(self):
+return f"""
+
+
+Read
+table name: {self.table_name}
+
+
+"""
+
+
+class Project(LogicalPlan):
+"""Logical plan object for a projection.
+
+All input arguments are directly serialized into the corresponding 
protocol buffer
+objects. This class only provides very limited error handling and input 
validation.
+
+To be compatible with PySpark, we validate that the input arguments are all
+expressions to be able to serialize them to the server.
+
+"""
+
+def __init__(self, child: Optional["LogicalPlan"], *columns: 
ExpressionOrString) -> None:
+super().__init__(child)
+self._raw_columns = list(columns)
+self.alias = None
+self._verify_expressions()
+
+def _verify_expressions(self):
+"""Ensures that all input arguments are instances of Expression."""
+for c in self._raw_columns:
+if not isinstance(c, Expression):
+raise InputValidationError(f"Only Expressions can be used for 
projections: '{c}'.")
+
+def withAlias(self, alias) -> LogicalPlan:
+self.alias = alias
+return self
+
+def plan(self, session: "RemoteSparkSession") -> proto.Relation:
+assert 

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977651147


##
python/pyspark/sql/connect/plan.py:
##
@@ -0,0 +1,468 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import (
+List,
+Optional,
+Sequence,
+Tuple,
+Union,
+cast,
+TYPE_CHECKING,
+)
+
+import pyspark.sql.connect.proto as proto
+from pyspark.sql.connect.column import (
+ColumnOrString,
+ColumnRef,
+Expression,
+ExpressionOrString,
+SortOrder,
+)
+
+
+if TYPE_CHECKING:
+from pyspark.sql.connect.client import RemoteSparkSession
+
+
+class InputValidationError(Exception):
+pass
+
+
+class LogicalPlan(object):
+
+INDENT = 2
+
+def __init__(self, child: Optional["LogicalPlan"]) -> None:
+self._child = child
+
+def unresolved_attr(self, *colNames: str) -> proto.Expression:
+"""Creates an unresolved attribute from a column name."""
+exp = proto.Expression()
+exp.unresolved_attribute.parts.extend(list(colNames))
+return exp
+
+def to_attr_or_expression(
+self, col: ColumnOrString, session: "RemoteSparkSession"
+) -> proto.Expression:
+"""Returns either an instance of an unresolved attribute or the 
serialized
+expression value of the column."""
+if type(col) is str:
+return self.unresolved_attr(cast(str, col))
+else:
+return cast(ColumnRef, col).to_plan(session)
+
+def plan(self, session: "RemoteSparkSession") -> proto.Relation:
+...
+
+def _verify(self, session: "RemoteSparkSession") -> bool:
+"""This method is used to verify that the current logical plan
+can be serialized to Proto and back and afterwards is identical."""
+plan = proto.Plan()
+plan.root.CopyFrom(self.plan(session))
+
+serialized_plan = plan.SerializeToString()
+test_plan = proto.Plan()
+test_plan.ParseFromString(serialized_plan)
+
+return test_plan == plan
+
+# TODO(martin.grund) explain , schema
+def collect(self, session: "RemoteSparkSession" = None, debug: bool = 
False):
+plan = proto.Plan()
+plan.root.CopyFrom(self.plan(session))
+
+if debug:
+print(plan)
+
+return plan
+
+def _i(self, indent) -> str:
+return " " * indent
+
+def print(self, indent=0) -> str:
+...
+
+def _repr_html_(self):
+...
+
+
+class Read(LogicalPlan):
+def __init__(self, table_name: str) -> None:
+super().__init__(None)
+self.table_name = table_name
+
+def plan(self, session: "RemoteSparkSession") -> proto.Relation:
+plan = proto.Relation()
+plan.read.named_table.parts.extend(self.table_name.split("."))
+return plan
+
+def print(self, indent=0) -> str:
+return f"{self._i(indent)}\n"
+
+def _repr_html_(self):
+return f"""
+
+
+Read
+table name: {self.table_name}
+
+
+"""
+
+
+class Project(LogicalPlan):
+"""Logical plan object for a projection.
+
+All input arguments are directly serialized into the corresponding 
protocol buffer
+objects. This class only provides very limited error handling and input 
validation.
+
+To be compatible with PySpark, we validate that the input arguments are all
+expressions to be able to serialize them to the server.
+
+"""
+
+def __init__(self, child: Optional["LogicalPlan"], *columns: 
ExpressionOrString) -> None:
+super().__init__(child)
+self._raw_columns = list(columns)
+self.alias = None
+self._verify_expressions()
+
+def _verify_expressions(self):
+"""Ensures that all input arguments are instances of Expression."""
+for c in self._raw_columns:
+if not isinstance(c, Expression):
+raise InputValidationError(f"Only Expressions can be used for 
projections: '{c}'.")
+
+def withAlias(self, alias) -> LogicalPlan:
+self.alias = alias
+return self
+
+def plan(self, session: "RemoteSparkSession") -> proto.Relation:
+assert 

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977650177


##
python/pyspark/sql/connect/functions.py:
##
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from pyspark.sql.connect.column import ColumnRef, LiteralExpression
+from pyspark.sql.connect.column import PrimitiveType
+
+

Review Comment:
   SPARK-40538



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977648488


##
python/pyspark/sql/connect/data_frame.py:
##
@@ -0,0 +1,241 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import (
+Any,
+Dict,
+List,
+Optional,
+Sequence,
+Tuple,
+Union,
+cast,
+TYPE_CHECKING,
+)
+
+import pyspark.sql.connect.plan as plan
+from pyspark.sql.connect.column import (
+ColumnOrString,
+ColumnRef,
+Expression,
+ExpressionOrString,
+LiteralExpression,
+)
+
+if TYPE_CHECKING:
+from pyspark.sql.connect.client import RemoteSparkSession
+
+
+ColumnOrName = Union[ColumnRef, str]
+
+
+class GroupingFrame(object):
+
+MeasuresType = Union[Sequence[Tuple[ExpressionOrString, str]], Dict[str, 
str]]
+OptMeasuresType = Optional[MeasuresType]
+
+def __init__(self, df: "DataFrame", *grouping_cols: Union[ColumnRef, str]) 
-> None:
+self._df = df
+self._grouping_cols = [x if isinstance(x, ColumnRef) else df[x] for x 
in grouping_cols]
+
+def agg(self, exprs: MeasuresType = None) -> "DataFrame":
+
+# Normalize the dictionary into a list of tuples.
+if isinstance(exprs, Dict):
+measures = list(exprs.items())
+elif isinstance(exprs, List):
+measures = exprs
+else:
+measures = []
+
+res = DataFrame.withPlan(
+plan.Aggregate(
+child=self._df._plan,
+grouping_cols=self._grouping_cols,
+measures=measures,
+),
+session=self._df._session,
+)
+return res
+
+def _map_cols_to_dict(self, fun: str, cols: List[Union[ColumnRef, str]]) 
-> Dict[str, str]:
+return {x if isinstance(x, str) else cast(ColumnRef, x).name(): fun 
for x in cols}
+
+def min(self, *cols: Union[ColumnRef, str]) -> "DataFrame":
+expr = self._map_cols_to_dict("min", list(cols))
+return self.agg(expr)
+
+def max(self, *cols: Union[ColumnRef, str]) -> "DataFrame":
+expr = self._map_cols_to_dict("max", list(cols))
+return self.agg(expr)
+
+def sum(self, *cols: Union[ColumnRef, str]) -> "DataFrame":
+expr = self._map_cols_to_dict("sum", list(cols))
+return self.agg(expr)
+
+def count(self) -> "DataFrame":
+return self.agg([(LiteralExpression(1), "count")])
+
+
+class DataFrame(object):
+"""Every DataFrame object essentially is a Relation that is refined using 
the
+member functions. Calling a method on a dataframe will essentially return 
a copy
+of the DataFrame with the changes applied.
+"""
+
+def __init__(self, data: List[Any] = None, schema: List[str] = None):
+"""Creates a new data frame"""
+self._schema = schema
+self._plan: Optional[plan.LogicalPlan] = None
+self._cache: Dict[str, Any] = {}
+self._session: "RemoteSparkSession" = None
+
+@classmethod
+def withPlan(cls, plan: plan.LogicalPlan, session=None) -> "DataFrame":
+"""Main initialization method used to construct a new data frame with 
a child plan."""
+new_frame = DataFrame()
+new_frame._plan = plan
+new_frame._session = session
+return new_frame
+
+def select(self, *cols: ColumnRef) -> "DataFrame":
+return DataFrame.withPlan(plan.Project(self._plan, *cols), 
session=self._session)
+
+def agg(self, exprs: Dict[str, str]) -> "DataFrame":
+return self.groupBy().agg(exprs)
+
+def alias(self, alias):
+return DataFrame.withPlan(plan.Project(self._plan).withAlias(alias), 
session=self._session)
+
+def approxQuantile(self, col, probabilities, relativeError):
+...
+
+def colRegex(self, regex) -> "DataFrame":
+# TODO needs analysis to pick the right column
+...
+
+@property
+def columns(self) -> List[str]:
+"""Returns the list of columns of the current data frame."""
+if self._plan is None:
+return []
+if "columns" not in self._cache and self._plan is not None:
+pdd = self.limit(0).collect()
+# Translate to standard pytho 

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977647168


##
project/SparkBuild.scala:
##
@@ -1031,12 +1105,13 @@ object Unidoc {
   Seq (
 publish := {},
 
+

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977646112


##
python/pyspark/sql/connect/function_builder.py:
##
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import functools
+from typing import TYPE_CHECKING
+
+import pyspark.sql.types
+from pyspark.sql.connect.column import (
+ColumnOrString,
+ColumnRef,
+Expression,
+ExpressionOrString,
+ScalarFunctionExpression,
+)
+
+if TYPE_CHECKING:
+from pyspark.sql.connect.client import RemoteSparkSession
+
+
+def _build(name: str, *args: ExpressionOrString) -> ScalarFunctionExpression:
+"""
+Simple wrapper function that converts the arguments into the appropriate 
types.
+Parameters
+--
+name Name of the function to be called.
+args The list of arguments.
+
+Returns
+---
+:class:`ScalarFunctionExpression`
+"""
+cols = [x if isinstance(x, Expression) else 
ColumnRef.from_qualified_name(x) for x in args]
+return ScalarFunctionExpression(name, *cols)
+
+
+class FunctionBuilder:
+"""This class is used to build arbitrary functions used in expressions"""
+
+def __getattr__(self, name):
+def _(*args: ExpressionOrString) -> ScalarFunctionExpression:
+return _build(name, *args)
+
+_.__doc__ = f"""Function to apply {name}"""
+return _
+
+
+functions = FunctionBuilder()
+
+
+class UserDefinedFunction(Expression):
+"""A user defied function is an expresison that has a reference to the 
actual
+Python callable attached. During plan generation, the client sends a 
command to
+the server to register the UDF before execution. The expression object can 
be
+reused and is not attached to a specific execution. If the internal name of
+the temporary function is set, it is assumed that the registration has 
already
+happened."""
+
+def __init__(self, func, return_type=pyspark.sql.types.StringType(), 
args=None):
+super().__init__()
+
+self._func_ref = func
+self._return_type = return_type
+self._args = list(args)
+self._func_name = None
+
+def to_plan(self, session: "RemoteSparkSession") -> Expression:
+# Needs to materialize the UDF to the server
+# Only do this once per session
+func_name = session.register_udf(self._func_ref, self._return_type)
+# Func name is used for the actual reference
+return _build(func_name, *self._args).to_plan(session)
+
+def __str__(self):
+return f"UserDefinedFunction({self._func_name})"
+
+
+def _create_udf(function, return_type):
+def wrapper(*cols: "ColumnOrString"):
+return UserDefinedFunction(func=function, return_type=return_type, 
args=cols)
+
+return wrapper
+
+
+def udf(function, return_type=pyspark.sql.types.StringType()):
+"""
+Returns a callable that represents the column ones arguments are applied

Review Comment:
   I fixed the sentence -> "ones" -> "once"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977644784


##
python/pyspark/sql/connect/data_frame.py:
##
@@ -0,0 +1,241 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import (
+Any,
+Dict,
+List,
+Optional,
+Sequence,
+Tuple,
+Union,
+cast,
+TYPE_CHECKING,
+)
+
+import pyspark.sql.connect.plan as plan
+from pyspark.sql.connect.column import (
+ColumnOrString,
+ColumnRef,
+Expression,
+ExpressionOrString,
+LiteralExpression,
+)
+
+if TYPE_CHECKING:
+from pyspark.sql.connect.client import RemoteSparkSession
+
+
+ColumnOrName = Union[ColumnRef, str]
+
+
+class GroupingFrame(object):
+
+MeasuresType = Union[Sequence[Tuple[ExpressionOrString, str]], Dict[str, 
str]]
+OptMeasuresType = Optional[MeasuresType]
+
+def __init__(self, df: "DataFrame", *grouping_cols: Union[ColumnRef, str]) 
-> None:
+self._df = df
+self._grouping_cols = [x if isinstance(x, ColumnRef) else df[x] for x 
in grouping_cols]
+
+def agg(self, exprs: MeasuresType = None) -> "DataFrame":
+
+# Normalize the dictionary into a list of tuples.
+if isinstance(exprs, Dict):
+measures = list(exprs.items())
+elif isinstance(exprs, List):
+measures = exprs
+else:
+measures = []
+
+res = DataFrame.withPlan(
+plan.Aggregate(
+child=self._df._plan,
+grouping_cols=self._grouping_cols,
+measures=measures,
+),
+session=self._df._session,
+)
+return res
+
+def _map_cols_to_dict(self, fun: str, cols: List[Union[ColumnRef, str]]) 
-> Dict[str, str]:
+return {x if isinstance(x, str) else cast(ColumnRef, x).name(): fun 
for x in cols}
+
+def min(self, *cols: Union[ColumnRef, str]) -> "DataFrame":
+expr = self._map_cols_to_dict("min", list(cols))
+return self.agg(expr)
+
+def max(self, *cols: Union[ColumnRef, str]) -> "DataFrame":
+expr = self._map_cols_to_dict("max", list(cols))
+return self.agg(expr)
+
+def sum(self, *cols: Union[ColumnRef, str]) -> "DataFrame":
+expr = self._map_cols_to_dict("sum", list(cols))
+return self.agg(expr)
+
+def count(self) -> "DataFrame":
+return self.agg([(LiteralExpression(1), "count")])
+
+
+class DataFrame(object):
+"""Every DataFrame object essentially is a Relation that is refined using 
the
+member functions. Calling a method on a dataframe will essentially return 
a copy
+of the DataFrame with the changes applied.
+"""
+
+def __init__(self, data: List[Any] = None, schema: List[str] = None):
+"""Creates a new data frame"""
+self._schema = schema
+self._plan: Optional[plan.LogicalPlan] = None
+self._cache: Dict[str, Any] = {}
+self._session: "RemoteSparkSession" = None
+
+@classmethod
+def withPlan(cls, plan: plan.LogicalPlan, session=None) -> "DataFrame":
+"""Main initialization method used to construct a new data frame with 
a child plan."""
+new_frame = DataFrame()
+new_frame._plan = plan
+new_frame._session = session
+return new_frame
+
+def select(self, *cols: ColumnRef) -> "DataFrame":
+return DataFrame.withPlan(plan.Project(self._plan, *cols), 
session=self._session)
+
+def agg(self, exprs: Dict[str, str]) -> "DataFrame":
+return self.groupBy().agg(exprs)
+
+def alias(self, alias):
+return DataFrame.withPlan(plan.Project(self._plan).withAlias(alias), 
session=self._session)
+
+def approxQuantile(self, col, probabilities, relativeError):
+...
+
+def colRegex(self, regex) -> "DataFrame":
+# TODO needs analysis to pick the right column
+...
+
+@property
+def columns(self) -> List[str]:
+"""Returns the list of columns of the current data frame."""
+if self._plan is None:
+return []
+if "columns" not in self._cache and self._plan is not None:
+pdd = self.limit(0).collect()
+# Translate to standard pytho 

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977644181


##
python/pyspark/sql/connect/data_frame.py:
##
@@ -0,0 +1,241 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import (
+Any,
+Dict,
+List,
+Optional,
+Sequence,
+Tuple,
+Union,
+cast,
+TYPE_CHECKING,
+)
+
+import pyspark.sql.connect.plan as plan
+from pyspark.sql.connect.column import (
+ColumnOrString,
+ColumnRef,
+Expression,
+ExpressionOrString,
+LiteralExpression,
+)
+
+if TYPE_CHECKING:
+from pyspark.sql.connect.client import RemoteSparkSession
+
+
+ColumnOrName = Union[ColumnRef, str]
+
+
+class GroupingFrame(object):
+
+MeasuresType = Union[Sequence[Tuple[ExpressionOrString, str]], Dict[str, 
str]]
+OptMeasuresType = Optional[MeasuresType]
+
+def __init__(self, df: "DataFrame", *grouping_cols: Union[ColumnRef, str]) 
-> None:
+self._df = df
+self._grouping_cols = [x if isinstance(x, ColumnRef) else df[x] for x 
in grouping_cols]
+
+def agg(self, exprs: MeasuresType = None) -> "DataFrame":
+
+# Normalize the dictionary into a list of tuples.
+if isinstance(exprs, Dict):
+measures = list(exprs.items())
+elif isinstance(exprs, List):
+measures = exprs
+else:
+measures = []
+
+res = DataFrame.withPlan(
+plan.Aggregate(
+child=self._df._plan,
+grouping_cols=self._grouping_cols,
+measures=measures,
+),
+session=self._df._session,
+)
+return res
+
+def _map_cols_to_dict(self, fun: str, cols: List[Union[ColumnRef, str]]) 
-> Dict[str, str]:
+return {x if isinstance(x, str) else cast(ColumnRef, x).name(): fun 
for x in cols}
+
+def min(self, *cols: Union[ColumnRef, str]) -> "DataFrame":
+expr = self._map_cols_to_dict("min", list(cols))
+return self.agg(expr)
+
+def max(self, *cols: Union[ColumnRef, str]) -> "DataFrame":
+expr = self._map_cols_to_dict("max", list(cols))
+return self.agg(expr)
+
+def sum(self, *cols: Union[ColumnRef, str]) -> "DataFrame":
+expr = self._map_cols_to_dict("sum", list(cols))
+return self.agg(expr)
+
+def count(self) -> "DataFrame":
+return self.agg([(LiteralExpression(1), "count")])
+
+
+class DataFrame(object):
+"""Every DataFrame object essentially is a Relation that is refined using 
the
+member functions. Calling a method on a dataframe will essentially return 
a copy
+of the DataFrame with the changes applied.
+"""
+
+def __init__(self, data: List[Any] = None, schema: List[str] = None):
+"""Creates a new data frame"""
+self._schema = schema
+self._plan: Optional[plan.LogicalPlan] = None
+self._cache: Dict[str, Any] = {}
+self._session: "RemoteSparkSession" = None
+
+@classmethod
+def withPlan(cls, plan: plan.LogicalPlan, session=None) -> "DataFrame":
+"""Main initialization method used to construct a new data frame with 
a child plan."""
+new_frame = DataFrame()
+new_frame._plan = plan
+new_frame._session = session
+return new_frame
+
+def select(self, *cols: ColumnRef) -> "DataFrame":
+return DataFrame.withPlan(plan.Project(self._plan, *cols), 
session=self._session)
+
+def agg(self, exprs: Dict[str, str]) -> "DataFrame":
+return self.groupBy().agg(exprs)
+
+def alias(self, alias):
+return DataFrame.withPlan(plan.Project(self._plan).withAlias(alias), 
session=self._session)
+
+def approxQuantile(self, col, probabilities, relativeError):
+...
+
+def colRegex(self, regex) -> "DataFrame":
+# TODO needs analysis to pick the right column

Review Comment:
   I removed the implementation, so that it's just empty for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, 

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977643521


##
python/pyspark/sql/connect/column.py:
##
@@ -0,0 +1,181 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import List, Union, cast, get_args, TYPE_CHECKING
+
+import pyspark.sql.connect.proto as proto
+
+PrimitiveType = Union[str, int, bool, float]
+ExpressionOrString = Union[str, "Expression"]
+ColumnOrString = Union[str, "ColumnRef"]
+
+if TYPE_CHECKING:
+from pyspark.sql.connect.client import RemoteSparkSession
+import pyspark.sql.connect.proto as proto
+
+
+class Expression(object):
+"""
+Expression base class.
+"""
+
+def __init__(self) -> None:  # type: ignore[name-defined]
+pass
+
+def to_plan(self, session: "RemoteSparkSession") -> "proto.Expression":  # 
type: ignore
+...
+
+def __str__(self) -> str:
+...
+
+
+class LiteralExpression(Expression):
+"""A literal expression.
+
+The Python types are converted best effort into the relevant proto types. 
On the Spark Connect
+server side, the proto types are converted to the Catalyst equivalents."""
+
+def __init__(self, value: PrimitiveType) -> None:  # type: 
ignore[name-defined]
+super().__init__()
+self._value = value
+
+def to_plan(self, session: "RemoteSparkSession") -> "proto.Expression":
+"""Converts the literal expression to the literal in proto.
+
+TODO This method always assumes the largest type and can thus

Review Comment:
   Done. SPARK-40533



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977642207


##
python/pyspark/sql/connect/README.md:
##
@@ -0,0 +1,34 @@
+
+# [EXPERIMENTAL] Spark Connect
+
+**Spark Connect is a strictly experimental feature and under heavy development.
+All APIs should be considered volatile and should not be used in production.**
+
+This module contains the implementation of Spark Connect which is a logical 
plan
+facade for the implementation in Spark. Spark Connect is directly integrated 
into the build
+of Spark. To enable it, you only need to activate the driver plugin for Spark 
Connect.
+
+
+
+
+## Build
+
+1. Build Spark as usual per the documentation.
+2. Build and package the Spark Connect package
+   ```commandline
+   ./build/mvn package

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977641208


##
python/mypy.ini:
##
@@ -23,6 +23,16 @@ show_error_codes = True
 warn_unused_ignores = True
 warn_redundant_casts = True
 
+[mypy-pyspark.sql.connect.*]

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977639594


##
project/SparkBuild.scala:
##
@@ -593,6 +608,60 @@ object Core {
   )
 }
 
+
+object SparkConnect {
+
+  import BuildCommons.protoVersion
+
+  private val shadePrefix = "org.sparkproject.connect"
+  val shadeJar = taskKey[Unit]("Shade the Jars")
+
+  lazy val settings = Seq(
+// Setting version for the protobuf compiler. This has to be propagated to 
every sub-project
+// even if the project is not using it.
+PB.protocVersion := BuildCommons.protoVersion,
+
+// For some reason the resolution from the imported Maven build does not 
work for some
+// of these dependendencies that we need to shade later on.
+libraryDependencies ++= Seq(
+  "io.grpc"  % "protoc-gen-grpc-java" % BuildCommons.gprcVersion 
asProtocPlugin(),
+  "org.scala-lang" % "scala-library" % "2.12.16" % "provided",
+  "com.google.guava" % "guava"% "31.0.1-jre",
+  "com.google.guava" % "failureaccess"% "1.0.1",
+  "com.google.protobuf" % "protobuf-java"% protoVersion % 
"protobuf"
+),
+
+dependencyOverrides ++= Seq(
+  "com.google.guava" % "guava"% "31.0.1-jre",
+  "com.google.guava" % "failureaccess"% "1.0.1",
+  "com.google.protobuf" % "protobuf-java"% protoVersion
+),
+
+(Compile / PB.targets) := Seq(
+  PB.gens.java-> (Compile / sourceManaged).value,
+  PB.gens.plugin("grpc-java") -> (Compile / sourceManaged).value
+),
+
+(assembly / test) := false,
+
+(assembly / logLevel) := Level.Info,
+
+(assembly / assemblyShadeRules) := Seq(
+  ShadeRule.rename("io.grpc.**" -> 
"org.sparkproject.connect.grpc.@0").inAll,
+  ShadeRule.rename("com.google.common.**"-> 
"org.sparkproject.connect.guava.@1").inAll,
+  ShadeRule.rename("com.google.thirdparty.**"-> 
"org.sparkproject.connect.guava.@1").inAll,
+  ShadeRule.rename("com.google.protobuf.**"-> 
"org.sparkproject.connect.protobuf.@1").inAll,

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977638831


##
project/SparkBuild.scala:
##
@@ -593,6 +608,60 @@ object Core {
   )
 }
 
+
+object SparkConnect {
+
+  import BuildCommons.protoVersion
+
+  private val shadePrefix = "org.sparkproject.connect"
+  val shadeJar = taskKey[Unit]("Shade the Jars")
+
+  lazy val settings = Seq(
+// Setting version for the protobuf compiler. This has to be propagated to 
every sub-project
+// even if the project is not using it.
+PB.protocVersion := BuildCommons.protoVersion,
+
+// For some reason the resolution from the imported Maven build does not 
work for some
+// of these dependendencies that we need to shade later on.
+libraryDependencies ++= Seq(
+  "io.grpc"  % "protoc-gen-grpc-java" % BuildCommons.gprcVersion 
asProtocPlugin(),
+  "org.scala-lang" % "scala-library" % "2.12.16" % "provided",
+  "com.google.guava" % "guava"% "31.0.1-jre",

Review Comment:
   I "re-formatted" this, please let me know if it matches your expectations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977637347


##
dev/tox.ini:
##
@@ -51,4 +51,6 @@ exclude =
 python/pyspark/worker.pyi,
 python/pyspark/java_gateway.pyi,
 dev/ansible-for-test-node/*,
+python/pyspark/sql/connect/proto/*,
+python/venv/*,

Review Comment:
   I removed the venv. The proto is because of the comment above. The venv is 
what I use for testing and the linter will complain about my local python 
virtualenv. I removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977636376


##
dev/tox.ini:
##
@@ -51,4 +51,6 @@ exclude =
 python/pyspark/worker.pyi,
 python/pyspark/java_gateway.pyi,
 dev/ansible-for-test-node/*,
+python/pyspark/sql/connect/proto/*,

Review Comment:
   This is generated code that we cannot re-enable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977634110


##
dev/infra/Dockerfile:
##
@@ -65,3 +65,6 @@ RUN Rscript -e "devtools::install_version('roxygen2', 
version='7.2.0', repos='ht
 
 # See more in SPARK-39735
 ENV R_LIBS_SITE 
"/usr/local/lib/R/site-library:${R_LIBS_SITE}:/usr/lib/R/library"
+
+# Add Python Deps for Spark Connect.

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977633804


##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.ByteString
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{Request, Response}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.connect.command.SparkConnectCommandPlanner
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.{
+  AdaptiveSparkPlanExec,
+  AdaptiveSparkPlanHelper,
+  QueryStageExec
+}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.util.ArrowUtils
+
+@Experimental
+@Since("3.3.1")
+class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) 
extends Logging {
+
+  def handle(v: Request): Unit = {
+// Preconditions.checkState(v.userContext.nonEmpty, "User Context must be 
present")
+val session =
+  
SparkConnectService.getOrCreateIsolatedSession(v.getUserContext.getUserId).session
+v.getPlan.getOpTypeCase match {
+  case proto.Plan.OpTypeCase.COMMAND => handleCommand(session, v)
+  case proto.Plan.OpTypeCase.ROOT => handlePlan(session, v)
+  case _ =>
+throw new UnsupportedOperationException(s"${v.getPlan.getOpTypeCase} 
not supported.")
+}
+  }
+
+  def handlePlan(session: SparkSession, request: proto.Request): Unit = {
+// Extract the plan from the request and convert it to a logical plan
+val planner = new SparkConnectPlanner(request.getPlan.getRoot, session)
+val rows =
+  Dataset.ofRows(session, planner.transform())
+processRows(request.getClientId, rows)
+  }
+
+  private def processRows(clientId: String, rows: DataFrame) = {
+val timeZoneId = SQLConf.get.sessionLocalTimeZone
+val schema =
+  ByteString.copyFrom(ArrowUtils.toArrowSchema(rows.schema, 
timeZoneId).toByteArray)
+
+val textSchema = rows.schema.fields.map(f => f.name).mkString("|")
+
+// TODO empty results (except limit 0) will not yield a schema.

Review Comment:
   This was a left over from a different implementation. Removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977633416


##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.ByteString
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{Request, Response}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.connect.command.SparkConnectCommandPlanner
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.{
+  AdaptiveSparkPlanExec,
+  AdaptiveSparkPlanHelper,
+  QueryStageExec
+}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.util.ArrowUtils
+
+@Experimental
+@Since("3.3.1")
+class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) 
extends Logging {
+
+  def handle(v: Request): Unit = {
+// Preconditions.checkState(v.userContext.nonEmpty, "User Context must be 
present")

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977632805


##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService Implementation.
+ *
+ * This class implements the service stub from the generated code of GRPC.
+ *
+ * @param debug
+ *   delegates debug behavior to the handlers.
+ */
+@Experimental
+@Since("3.3.1")
+class SparkConnectService(debug: Boolean)
+extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
+with Logging {
+
+  /**
+   * This is the main entry method for Spark Connect and all calls to execute 
a plan.
+   *
+   * The plan execution is delegated to the [[SparkConnectStreamHandler]]. All 
error handling
+   * should be directly implemented in the deferred implementation. But this 
method catches
+   * generic errors.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def executePlan(request: Request, responseObserver: 
StreamObserver[Response]): Unit = {
+try {
+  new SparkConnectStreamHandler(responseObserver).handle(request)
+} catch {
+  case e: Exception =>
+log.error("Error executing plan.", e)
+responseObserver.onError(
+  
Status.UNKNOWN.withCause(e).withDescription(e.getLocalizedMessage).asRuntimeException())
+}
+  }
+
+  /**
+   * Analyze a plan provide metadata and debugging information.
+   *
+   * This method is called to generate the explain plan for a SparkConnect 
plan. In its simplest
+   * implementation, the plan that is generated by the [[SparkConnectPlanner]] 
is used to build a
+   * [[Dataset]] and derive the explain string from the query execution 
details.
+   *
+   * Errors during planning are returned via the [[StreamObserver]] interface.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def analyzePlan(
+  request: Request,
+  responseObserver: StreamObserver[AnalyzeResponse]): Unit = {
+try {
+  val session =
+
SparkConnectService.getOrCreateIsolatedSession(request.getUserContext.getUserId).session
+
+  val logicalPlan = request.getPlan.getOpTypeCase match {
+case proto.Plan.OpTypeCase.ROOT =>
+  new SparkConnectPlanner(request.getPlan.getRoot, session).transform()
+case _ =>
+  responseObserver.onError(
+new UnsupportedOperationException(
+  s"${request.getPlan.getOpTypeCase} not supported for analysis."))
+  return
+  }
+  val ds = Dataset.ofRows(session, logicalPlan)
+  val explainString = ds.queryExecution.explainString(ExtendedMode)
+
+  val resp = proto.AnalyzeResponse
+.newBuilder()
+.setExplainString(explainString)
+.setClientId(request.getClientId)
+
+  resp.addAllColumnTypes(ds.schema.fields.map(_.dataType.sql).toSeq.asJava)
+  resp.addAllColumnNames(ds.schema.fields.map(_.name).toSeq.asJava)
+  responseObserver.onNext(resp.build())
+  responseObserver.onCompleted()
+} catch {
+  case e: Exception =>
+log.error("Error analyzing plan.", e)
+  

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977633071


##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.ByteString
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{Request, Response}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.connect.command.SparkConnectCommandPlanner
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.{
+  AdaptiveSparkPlanExec,
+  AdaptiveSparkPlanHelper,
+  QueryStageExec
+}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.util.ArrowUtils
+
+@Experimental
+@Since("3.3.1")

Review Comment:
   Done, missed pushing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977631655


##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService Implementation.
+ *
+ * This class implements the service stub from the generated code of GRPC.
+ *
+ * @param debug
+ *   delegates debug behavior to the handlers.
+ */
+@Experimental
+@Since("3.3.1")
+class SparkConnectService(debug: Boolean)
+extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
+with Logging {
+
+  /**
+   * This is the main entry method for Spark Connect and all calls to execute 
a plan.
+   *
+   * The plan execution is delegated to the [[SparkConnectStreamHandler]]. All 
error handling
+   * should be directly implemented in the deferred implementation. But this 
method catches
+   * generic errors.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def executePlan(request: Request, responseObserver: 
StreamObserver[Response]): Unit = {
+try {
+  new SparkConnectStreamHandler(responseObserver).handle(request)
+} catch {
+  case e: Exception =>
+log.error("Error executing plan.", e)
+responseObserver.onError(
+  
Status.UNKNOWN.withCause(e).withDescription(e.getLocalizedMessage).asRuntimeException())
+}
+  }
+
+  /**
+   * Analyze a plan provide metadata and debugging information.
+   *
+   * This method is called to generate the explain plan for a SparkConnect 
plan. In its simplest
+   * implementation, the plan that is generated by the [[SparkConnectPlanner]] 
is used to build a
+   * [[Dataset]] and derive the explain string from the query execution 
details.
+   *
+   * Errors during planning are returned via the [[StreamObserver]] interface.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def analyzePlan(
+  request: Request,
+  responseObserver: StreamObserver[AnalyzeResponse]): Unit = {
+try {
+  val session =
+
SparkConnectService.getOrCreateIsolatedSession(request.getUserContext.getUserId).session
+
+  val logicalPlan = request.getPlan.getOpTypeCase match {
+case proto.Plan.OpTypeCase.ROOT =>
+  new SparkConnectPlanner(request.getPlan.getRoot, session).transform()
+case _ =>
+  responseObserver.onError(
+new UnsupportedOperationException(
+  s"${request.getPlan.getOpTypeCase} not supported for analysis."))
+  return
+  }
+  val ds = Dataset.ofRows(session, logicalPlan)
+  val explainString = ds.queryExecution.explainString(ExtendedMode)
+
+  val resp = proto.AnalyzeResponse
+.newBuilder()
+.setExplainString(explainString)
+.setClientId(request.getClientId)
+
+  resp.addAllColumnTypes(ds.schema.fields.map(_.dataType.sql).toSeq.asJava)
+  resp.addAllColumnNames(ds.schema.fields.map(_.name).toSeq.asJava)
+  responseObserver.onNext(resp.build())
+  responseObserver.onCompleted()
+} catch {
+  case e: Exception =>
+log.error("Error analyzing plan.", e)
+  

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977631190


##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService Implementation.
+ *
+ * This class implements the service stub from the generated code of GRPC.
+ *
+ * @param debug
+ *   delegates debug behavior to the handlers.
+ */
+@Experimental
+@Since("3.3.1")
+class SparkConnectService(debug: Boolean)
+extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
+with Logging {
+
+  /**
+   * This is the main entry method for Spark Connect and all calls to execute 
a plan.
+   *
+   * The plan execution is delegated to the [[SparkConnectStreamHandler]]. All 
error handling
+   * should be directly implemented in the deferred implementation. But this 
method catches
+   * generic errors.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def executePlan(request: Request, responseObserver: 
StreamObserver[Response]): Unit = {
+try {
+  new SparkConnectStreamHandler(responseObserver).handle(request)
+} catch {
+  case e: Exception =>
+log.error("Error executing plan.", e)
+responseObserver.onError(
+  
Status.UNKNOWN.withCause(e).withDescription(e.getLocalizedMessage).asRuntimeException())
+}
+  }
+
+  /**
+   * Analyze a plan provide metadata and debugging information.
+   *
+   * This method is called to generate the explain plan for a SparkConnect 
plan. In its simplest
+   * implementation, the plan that is generated by the [[SparkConnectPlanner]] 
is used to build a
+   * [[Dataset]] and derive the explain string from the query execution 
details.
+   *
+   * Errors during planning are returned via the [[StreamObserver]] interface.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def analyzePlan(
+  request: Request,
+  responseObserver: StreamObserver[AnalyzeResponse]): Unit = {
+try {
+  val session =
+
SparkConnectService.getOrCreateIsolatedSession(request.getUserContext.getUserId).session
+
+  val logicalPlan = request.getPlan.getOpTypeCase match {
+case proto.Plan.OpTypeCase.ROOT =>
+  new SparkConnectPlanner(request.getPlan.getRoot, session).transform()
+case _ =>
+  responseObserver.onError(
+new UnsupportedOperationException(
+  s"${request.getPlan.getOpTypeCase} not supported for analysis."))
+  return
+  }
+  val ds = Dataset.ofRows(session, logicalPlan)
+  val explainString = ds.queryExecution.explainString(ExtendedMode)
+
+  val resp = proto.AnalyzeResponse
+.newBuilder()
+.setExplainString(explainString)
+.setClientId(request.getClientId)
+
+  resp.addAllColumnTypes(ds.schema.fields.map(_.dataType.sql).toSeq.asJava)
+  resp.addAllColumnNames(ds.schema.fields.map(_.name).toSeq.asJava)
+  responseObserver.onNext(resp.build())
+  responseObserver.onCompleted()
+} catch {
+  case e: Exception =>
+log.error("Error analyzing plan.", e)
+  

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977629941


##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService Implementation.
+ *
+ * This class implements the service stub from the generated code of GRPC.
+ *
+ * @param debug
+ *   delegates debug behavior to the handlers.
+ */
+@Experimental
+@Since("3.3.1")
+class SparkConnectService(debug: Boolean)
+extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
+with Logging {
+
+  /**
+   * This is the main entry method for Spark Connect and all calls to execute 
a plan.
+   *
+   * The plan execution is delegated to the [[SparkConnectStreamHandler]]. All 
error handling
+   * should be directly implemented in the deferred implementation. But this 
method catches
+   * generic errors.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def executePlan(request: Request, responseObserver: 
StreamObserver[Response]): Unit = {
+try {
+  new SparkConnectStreamHandler(responseObserver).handle(request)
+} catch {
+  case e: Exception =>

Review Comment:
   Done



##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977629253


##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService Implementation.
+ *
+ * This class implements the service stub from the generated code of GRPC.
+ *
+ * @param debug
+ *   delegates debug behavior to the handlers.
+ */
+@Experimental
+@Since("3.3.1")

Review Comment:
   Done, missed pushing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977629581


##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService Implementation.

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977629015


##
connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{expressions, plans}
+import org.apache.spark.sql.catalyst.analysis.{
+  UnresolvedAlias,
+  UnresolvedAttribute,
+  UnresolvedFunction,
+  UnresolvedRelation,
+  UnresolvedStar
+}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.types.{
+  BinaryType,
+  ByteType,
+  DateType,
+  DoubleType,
+  FloatType,
+  IntegerType,
+  ShortType,
+  TimestampType
+}
+
+final case class InvalidPlanInput(
+private val message: String = "",
+private val cause: Throwable = None.orNull)
+extends Exception(message, cause)
+
+@Experimental
+@Since("3.3.1")
+class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) {
+
+  def transform(): LogicalPlan = {
+transformRelation(plan)
+  }
+
+  // The root of the query plan is a relation and we apply the transformations 
to it.
+  private def transformRelation(rel: proto.Relation): LogicalPlan = {
+val common = if (rel.hasCommon) {
+  Some(rel.getCommon)
+} else {
+  None
+}
+
+rel.getRelTypeCase match {
+  case proto.Relation.RelTypeCase.READ => transformReadRel(rel.getRead, 
common)
+  case proto.Relation.RelTypeCase.PROJECT => 
transformProject(rel.getProject, common)
+  case proto.Relation.RelTypeCase.FILTER => transformFilter(rel.getFilter)
+  case proto.Relation.RelTypeCase.FETCH => transformFetch(rel.getFetch)
+  case proto.Relation.RelTypeCase.JOIN => transformJoin(rel.getJoin)
+  case proto.Relation.RelTypeCase.UNION => transformUnion(rel.getUnion)
+  case proto.Relation.RelTypeCase.SORT => transformSort(rel.getSort)
+  case proto.Relation.RelTypeCase.AGGREGATE => 
transformAggregate(rel.getAggregate)
+  case proto.Relation.RelTypeCase.SQL => transformSql(rel.getSql)
+  case proto.Relation.RelTypeCase.RELTYPE_NOT_SET =>
+throw new IndexOutOfBoundsException("Expected Relation to be set, but 
is empty.")
+  case _ => throw InvalidPlanInput(s"${rel.getUnknown} not supported.")
+}
+  }
+
+  private def transformSql(sql: proto.SQL): LogicalPlan = {
+session.sessionState.sqlParser.parsePlan(sql.getQuery)
+  }
+
+  private def transformReadRel(
+  rel: proto.Read,
+  common: Option[proto.RelationCommon]): LogicalPlan = {
+val baseRelation = rel.getReadTypeCase match {
+  case proto.Read.ReadTypeCase.NAMED_TABLE =>
+val child = 
UnresolvedRelation(rel.getNamedTable.getPartsList.asScala.toSeq)
+if (common.nonEmpty && common.get.getAlias.nonEmpty) {
+  SubqueryAlias(identifier = common.get.getAlias, child = child)
+} else {
+  child
+}
+  case _ => throw InvalidPlanInput()
+}
+baseRelation
+  }
+
+  private def transformFilter(rel: proto.Filter): LogicalPlan = {
+assert(rel.hasInput)
+val baseRel = transformRelation(rel.getInput)
+logical.Filter(condition = transformExpression(rel.getCondition), child = 
baseRel)
+  }
+
+  private def transformProject(
+  rel: proto.Project,
+  common: Option[proto.RelationCommon]): LogicalPlan = {
+val baseRel = transformRelation(rel.getInput)
+val projection = if (rel.getExpressionsCount == 0) {
+  Seq(UnresolvedStar(Option.empty))
+} else {
+  
rel.getExpressionsList.asScala.map(transformExpression).map(UnresolvedAlias(_))
+}
+val project = logical.Project(projectList = projection.toSeq, child = 
baseRel)
+if (common.nonEmpty && common.get.getAlias.nonEmpty) {
+  logical.SubqueryAlias(identifier = common.get.getAlias, child = 

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977627150


##
connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{expressions, plans}
+import org.apache.spark.sql.catalyst.analysis.{
+  UnresolvedAlias,
+  UnresolvedAttribute,
+  UnresolvedFunction,
+  UnresolvedRelation,
+  UnresolvedStar
+}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.types.{
+  BinaryType,
+  ByteType,
+  DateType,
+  DoubleType,
+  FloatType,
+  IntegerType,
+  ShortType,
+  TimestampType
+}
+
+final case class InvalidPlanInput(
+private val message: String = "",
+private val cause: Throwable = None.orNull)
+extends Exception(message, cause)
+
+@Experimental
+@Since("3.3.1")
+class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) {
+
+  def transform(): LogicalPlan = {
+transformRelation(plan)
+  }
+
+  // The root of the query plan is a relation and we apply the transformations 
to it.
+  private def transformRelation(rel: proto.Relation): LogicalPlan = {
+val common = if (rel.hasCommon) {
+  Some(rel.getCommon)
+} else {
+  None
+}
+
+rel.getRelTypeCase match {
+  case proto.Relation.RelTypeCase.READ => transformReadRel(rel.getRead, 
common)
+  case proto.Relation.RelTypeCase.PROJECT => 
transformProject(rel.getProject, common)
+  case proto.Relation.RelTypeCase.FILTER => transformFilter(rel.getFilter)
+  case proto.Relation.RelTypeCase.FETCH => transformFetch(rel.getFetch)
+  case proto.Relation.RelTypeCase.JOIN => transformJoin(rel.getJoin)
+  case proto.Relation.RelTypeCase.UNION => transformUnion(rel.getUnion)
+  case proto.Relation.RelTypeCase.SORT => transformSort(rel.getSort)
+  case proto.Relation.RelTypeCase.AGGREGATE => 
transformAggregate(rel.getAggregate)
+  case proto.Relation.RelTypeCase.SQL => transformSql(rel.getSql)
+  case proto.Relation.RelTypeCase.RELTYPE_NOT_SET =>
+throw new IndexOutOfBoundsException("Expected Relation to be set, but 
is empty.")
+  case _ => throw InvalidPlanInput(s"${rel.getUnknown} not supported.")
+}
+  }
+
+  private def transformSql(sql: proto.SQL): LogicalPlan = {
+session.sessionState.sqlParser.parsePlan(sql.getQuery)
+  }
+
+  private def transformReadRel(
+  rel: proto.Read,
+  common: Option[proto.RelationCommon]): LogicalPlan = {
+val baseRelation = rel.getReadTypeCase match {
+  case proto.Read.ReadTypeCase.NAMED_TABLE =>
+val child = 
UnresolvedRelation(rel.getNamedTable.getPartsList.asScala.toSeq)
+if (common.nonEmpty && common.get.getAlias.nonEmpty) {
+  SubqueryAlias(identifier = common.get.getAlias, child = child)
+} else {
+  child
+}
+  case _ => throw InvalidPlanInput()
+}
+baseRelation
+  }
+
+  private def transformFilter(rel: proto.Filter): LogicalPlan = {
+assert(rel.hasInput)
+val baseRel = transformRelation(rel.getInput)
+logical.Filter(condition = transformExpression(rel.getCondition), child = 
baseRel)
+  }
+
+  private def transformProject(
+  rel: proto.Project,
+  common: Option[proto.RelationCommon]): LogicalPlan = {
+val baseRel = transformRelation(rel.getInput)
+val projection = if (rel.getExpressionsCount == 0) {
+  Seq(UnresolvedStar(Option.empty))
+} else {
+  
rel.getExpressionsList.asScala.map(transformExpression).map(UnresolvedAlias(_))
+}
+val project = logical.Project(projectList = projection.toSeq, child = 
baseRel)
+if (common.nonEmpty && common.get.getAlias.nonEmpty) {
+  logical.SubqueryAlias(identifier = common.get.getAlias, child = 

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977625111


##
connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.command
+
+import scala.collection.JavaConverters._
+
+import com.google.common.collect.{Lists, Maps}
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
+import org.apache.spark.sql.types.StringType
+
+@Experimental
+@Since("3.3.1")
+class SparkConnectCommandPlanner(session: SparkSession, command: 
proto.Command) {
+
+  lazy val pythonVersion =
+sys.env.getOrElse("PYSPARK_PYTHON", 
sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python3"))
+
+  def process(): Unit = {
+command.getCommandTypeCase match {
+  case proto.Command.CommandTypeCase.CREATE_FUNCTION =>
+handleCreateScalarFunction(command.getCreateFunction)
+  case _ => throw new UnsupportedOperationException(s"${command} not 
supported.")
+}
+  }
+
+  // This is a helper function that registers a new Python function in the
+  // [[SparkSession]].
+  def handleCreateScalarFunction(cf: proto.CreateScalarFunction): Unit = {
+val function = SimplePythonFunction(
+  cf.getSerializedFunction.toByteArray,
+  Maps.newHashMap(),
+  Lists.newArrayList(),
+  pythonVersion,
+  "3.9", // TODO This needs to be an actual version.

Review Comment:
   Created SPARK-40532.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-22 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977622423


##
connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.command
+
+import scala.collection.JavaConverters._
+
+import com.google.common.collect.{Lists, Maps}
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
+import org.apache.spark.sql.types.StringType
+
+@Experimental
+@Since("3.3.1")
+class SparkConnectCommandPlanner(session: SparkSession, command: 
proto.Command) {
+
+  lazy val pythonVersion =
+sys.env.getOrElse("PYSPARK_PYTHON", 
sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python3"))
+
+  def process(): Unit = {
+command.getCommandTypeCase match {
+  case proto.Command.CommandTypeCase.CREATE_FUNCTION =>
+handleCreateScalarFunction(command.getCreateFunction)
+  case _ => throw new UnsupportedOperationException(s"${command} not 
supported.")
+}
+  }
+
+  // This is a helper function that registers a new Python function in the
+  // [[SparkSession]].

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-21 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r976770848


##
connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.command
+
+import scala.collection.JavaConverters._
+
+import com.google.common.collect.{Lists, Maps}
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
+import org.apache.spark.sql.types.StringType
+
+@Experimental
+@Since("3.3.1")

Review Comment:
   Sorry, I wasn't sure which version to pick. Changed it to 3.4.0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-21 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r976770428


##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService Implementation.
+ *
+ * This class implements the service stub from the generated code of GRPC.
+ *
+ * @param debug
+ *   delegates debug behavior to the handlers.
+ */
+@Experimental
+@Since("3.3.1")
+class SparkConnectService(debug: Boolean)
+extends SparkConnectServiceGrpc.SparkConnectServiceImplBase {
+
+  /**
+   * This is the main entry method for Spark Connect and all calls to execute 
a plan.
+   *
+   * The plan execution is delegated to the [[SparkConnectStreamHandler]]. All 
error handling
+   * should be directly implemented in the deferred implementation. But this 
method catches
+   * generic errors.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def executePlan(request: Request, responseObserver: 
StreamObserver[Response]): Unit = {
+try {
+  new SparkConnectStreamHandler(responseObserver).handle(request)
+} catch {
+  case e: Exception =>
+e.printStackTrace()

Review Comment:
   Done.



##
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService Implementation.
+ *
+ * This class 

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-20 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r975782260


##
dev/deps/spark-deps-hadoop-3-hive-2.3:
##
@@ -60,10 +62,20 @@ datanucleus-core/4.1.17//datanucleus-core-4.1.17.jar
 datanucleus-rdbms/4.1.19//datanucleus-rdbms-4.1.19.jar
 derby/10.14.2.0//derby-10.14.2.0.jar
 
dropwizard-metrics-hadoop-metrics2-reporter/0.1.2//dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar
+error_prone_annotations/2.10.0//error_prone_annotations-2.10.0.jar
+failureaccess/1.0.1//failureaccess-1.0.1.jar
 flatbuffers-java/1.12.0//flatbuffers-java-1.12.0.jar
 gcs-connector/hadoop3-2.2.7/shaded/gcs-connector-hadoop3-2.2.7-shaded.jar
 generex/1.0.2//generex-1.0.2.jar
 gmetric4j/1.0.10//gmetric4j-1.0.10.jar
+grpc-api/1.47.0//grpc-api-1.47.0.jar

Review Comment:
   Yes, the dependencies are shaded, but the `dev/test-depdencies.sh` script 
fails if there are not listed here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-20 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r975731244


##
project/SparkBuild.scala:
##
@@ -753,6 +815,7 @@ object OldDeps {
   }
 
   def oldDepsSettings() = Defaults.coreDefaultSettings ++ Seq(
+PB.protocVersion := "3.21.1",

Review Comment:
   Generalized the version into a variable.



##
connect/src/main/scala/org/apache/spark/sql/sparkconnect/command/SparkConnectCommandPlanner.scala:
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sparkconnect.command
+
+import com.google.common.collect.{Lists, Maps}
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
+import org.apache.spark.connect.{proto => proto}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
+import org.apache.spark.sql.types.StringType
+
+@Experimental
+class SparkConnectCommandPlanner(session: SparkSession, command: 
proto.Command) {

Review Comment:
   Done, added `@Since("3.3.1")`, is this correct?



##
project/SparkBuild.scala:
##
@@ -357,7 +366,10 @@ object SparkBuild extends PomBuild {
 
 // To prevent intermittent compilation failures, see also SPARK-33297
 // Apparently we can remove this when we use JDK 11.
-Test / classLoaderLayeringStrategy := ClassLoaderLayeringStrategy.Flat
+Test / classLoaderLayeringStrategy := ClassLoaderLayeringStrategy.Flat,
+
+// BUG fuck me

Review Comment:
   Done. The SBT build was a major pain. Sorry for the leftover.



##
connect/pom.xml:
##
@@ -0,0 +1,281 @@
+
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+org.apache.spark
+spark-parent_2.12
+3.4.0-SNAPSHOT
+../pom.xml
+
+
+spark-connect_2.12
+jar
+Spark Project Connect
+https://spark.apache.org/
+
+
+
org.sparkproject.connect
+
+connect
+3.21.1
+31.0.1-jre
+1.47.0
+6.0.53
+
+
+
+
+org.apache.spark
+spark-core_${scala.binary.version}
+${project.version}
+provided
+
+
+com.google.guava
+guava
+
+
+
+
+org.apache.spark
+spark-core_${scala.binary.version}
+${project.version}
+test-jar
+test
+
+
+org.apache.spark
+spark-catalyst_${scala.binary.version}
+${project.version}
+provided
+
+
+com.google.guava
+guava
+
+
+
+
+org.apache.spark
+spark-sql_${scala.binary.version}
+${project.version}
+provided
+
+
+com.google.guava
+guava
+
+
+
+
+
+com.google.guava
+guava
+31.0.1-jre
+compile
+
+
+com.google.guava
+failureaccess
+1.0.1
+
+
+io.grpc
+grpc-netty-shaded
+${io.grpc.version}
+
+
+io.grpc
+grpc-protobuf
+${io.grpc.version}
+
+
+io.grpc
+grpc-services
+${io.grpc.version}
+
+
+io.grpc
+grpc-stub
+${io.grpc.version}
+
+ 
+org.apache.tomcat
+annotations-api
+${tomcat.annotations.api.version}
+provided
+
+
+org.scalacheck
+scalacheck_${scala.binary.version}
+

[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-20 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r975725801


##
dev/infra/Dockerfile:
##
@@ -65,3 +65,6 @@ RUN Rscript -e "devtools::install_version('roxygen2', 
version='7.2.0', repos='ht
 
 # See more in SPARK-39735
 ENV R_LIBS_SITE 
"/usr/local/lib/R/site-library:${R_LIBS_SITE}:/usr/lib/R/library"
+
+# Add Python Deps for Spark Connect.
+RUN python3.9 -m pip install grpcio protobuf

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-19 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r974611991


##
connect/src/main/scala/org/apache/spark/sql/sparkconnect/planner/SparkConnectPlanner.scala:
##
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sparkconnect.planner
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{expressions, plans}
+import org.apache.spark.sql.catalyst.analysis.{
+  UnresolvedAlias,
+  UnresolvedAttribute,
+  UnresolvedFunction,
+  UnresolvedRelation,
+  UnresolvedStar

Review Comment:
   Basically, if I run
   
   ```
   import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, 
UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
   ```
   
   through
   
   ```
   ./build/mvn -Pscala-2.12 scalafmt:format -Dscalafmt.skip=false 
-Dscalafmt.onlyChanged=false -pl connect
   ```
   
   it becomes this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-19 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r974602945


##
connect/src/main/scala/org/apache/spark/sql/sparkconnect/planner/SparkConnectPlanner.scala:
##
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sparkconnect.planner
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{expressions, plans}
+import org.apache.spark.sql.catalyst.analysis.{
+  UnresolvedAlias,
+  UnresolvedAttribute,
+  UnresolvedFunction,
+  UnresolvedRelation,
+  UnresolvedStar

Review Comment:
   I'm running this through `scalafmt` and checkstyle and it doesn't complain. 
What's the best way to solve this automatically?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-19 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r974599749


##
connect/src/main/scala/org/apache/spark/sql/sparkconnect/command/SparkConnectCommandPlanner.scala:
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sparkconnect.command

Review Comment:
   I'll change to connect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org