HyukjinKwon commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r977269368


##########
connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.command
+
+import scala.collection.JavaConverters._
+
+import com.google.common.collect.{Lists, Maps}
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
+import org.apache.spark.sql.types.StringType
+
+@Experimental
+@Since("3.3.1")
+class SparkConnectCommandPlanner(session: SparkSession, command: 
proto.Command) {
+
+  lazy val pythonVersion =
+    sys.env.getOrElse("PYSPARK_PYTHON", 
sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python3"))
+
+  def process(): Unit = {
+    command.getCommandTypeCase match {
+      case proto.Command.CommandTypeCase.CREATE_FUNCTION =>
+        handleCreateScalarFunction(command.getCreateFunction)
+      case _ => throw new UnsupportedOperationException(s"${command} not 
supported.")
+    }
+  }
+
+  // This is a helper function that registers a new Python function in the
+  // [[SparkSession]].

Review Comment:
   ```suggestion
     // `SparkSession`.
   ```
   
   Since this isn't in a Scaladoc



##########
connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{expressions, plans}
+import org.apache.spark.sql.catalyst.analysis.{
+  UnresolvedAlias,
+  UnresolvedAttribute,
+  UnresolvedFunction,
+  UnresolvedRelation,
+  UnresolvedStar
+}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.types.{
+  BinaryType,
+  ByteType,
+  DateType,
+  DoubleType,
+  FloatType,
+  IntegerType,
+  ShortType,
+  TimestampType
+}
+
+final case class InvalidPlanInput(
+    private val message: String = "",
+    private val cause: Throwable = None.orNull)
+    extends Exception(message, cause)
+
+@Experimental
+@Since("3.3.1")
+class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) {
+
+  def transform(): LogicalPlan = {
+    transformRelation(plan)
+  }
+
+  // The root of the query plan is a relation and we apply the transformations 
to it.
+  private def transformRelation(rel: proto.Relation): LogicalPlan = {
+    val common = if (rel.hasCommon) {
+      Some(rel.getCommon)
+    } else {
+      None
+    }
+
+    rel.getRelTypeCase match {
+      case proto.Relation.RelTypeCase.READ => transformReadRel(rel.getRead, 
common)
+      case proto.Relation.RelTypeCase.PROJECT => 
transformProject(rel.getProject, common)
+      case proto.Relation.RelTypeCase.FILTER => transformFilter(rel.getFilter)
+      case proto.Relation.RelTypeCase.FETCH => transformFetch(rel.getFetch)
+      case proto.Relation.RelTypeCase.JOIN => transformJoin(rel.getJoin)
+      case proto.Relation.RelTypeCase.UNION => transformUnion(rel.getUnion)
+      case proto.Relation.RelTypeCase.SORT => transformSort(rel.getSort)
+      case proto.Relation.RelTypeCase.AGGREGATE => 
transformAggregate(rel.getAggregate)
+      case proto.Relation.RelTypeCase.SQL => transformSql(rel.getSql)
+      case proto.Relation.RelTypeCase.RELTYPE_NOT_SET =>
+        throw new IndexOutOfBoundsException("Expected Relation to be set, but 
is empty.")
+      case _ => throw InvalidPlanInput(s"${rel.getUnknown} not supported.")
+    }
+  }
+
+  private def transformSql(sql: proto.SQL): LogicalPlan = {
+    session.sessionState.sqlParser.parsePlan(sql.getQuery)
+  }
+
+  private def transformReadRel(
+      rel: proto.Read,
+      common: Option[proto.RelationCommon]): LogicalPlan = {
+    val baseRelation = rel.getReadTypeCase match {
+      case proto.Read.ReadTypeCase.NAMED_TABLE =>
+        val child = 
UnresolvedRelation(rel.getNamedTable.getPartsList.asScala.toSeq)
+        if (common.nonEmpty && common.get.getAlias.nonEmpty) {
+          SubqueryAlias(identifier = common.get.getAlias, child = child)
+        } else {
+          child
+        }
+      case _ => throw InvalidPlanInput()
+    }
+    baseRelation
+  }
+
+  private def transformFilter(rel: proto.Filter): LogicalPlan = {
+    assert(rel.hasInput)
+    val baseRel = transformRelation(rel.getInput)
+    logical.Filter(condition = transformExpression(rel.getCondition), child = 
baseRel)
+  }
+
+  private def transformProject(
+      rel: proto.Project,
+      common: Option[proto.RelationCommon]): LogicalPlan = {
+    val baseRel = transformRelation(rel.getInput)
+    val projection = if (rel.getExpressionsCount == 0) {
+      Seq(UnresolvedStar(Option.empty))
+    } else {
+      
rel.getExpressionsList.asScala.map(transformExpression).map(UnresolvedAlias(_))
+    }
+    val project = logical.Project(projectList = projection.toSeq, child = 
baseRel)
+    if (common.nonEmpty && common.get.getAlias.nonEmpty) {
+      logical.SubqueryAlias(identifier = common.get.getAlias, child = project)
+    } else {
+      project
+    }
+  }
+
+  private def transformUnresolvedExpression(exp: proto.Expression): 
UnresolvedAttribute = {
+    UnresolvedAttribute(exp.getUnresolvedAttribute.getPartsList.asScala.toSeq)
+  }
+
+  private def transformExpression(exp: proto.Expression): Expression = {
+    exp.getExprTypeCase match {
+      case proto.Expression.ExprTypeCase.LITERAL => 
transformLiteral(exp.getLiteral)
+      case proto.Expression.ExprTypeCase.UNRESOLVED_ATTRIBUTE =>
+        transformUnresolvedExpression(exp)
+      case proto.Expression.ExprTypeCase.UNRESOLVED_FUNCTION =>
+        transformScalarFunction(exp.getUnresolvedFunction)
+      case _ => throw InvalidPlanInput()
+    }
+  }
+
+  /**
+   * Transforms the protocol buffers literal into the appropriate Catalyst 
literal expression.
+   *
+   * TODO: Missing support for Instant, BigDecimal, LocalDate, LocalTimestamp, 
Duration, Period.
+   * @param lit
+   * @return
+   *   Expression
+   */
+  private def transformLiteral(lit: proto.Expression.Literal): Expression = {
+    lit.getLiteralTypeCase match {
+      case proto.Expression.Literal.LiteralTypeCase.BOOLEAN => 
expressions.Literal(lit.getBoolean)
+      case proto.Expression.Literal.LiteralTypeCase.I8 => 
expressions.Literal(lit.getI8, ByteType)
+      case proto.Expression.Literal.LiteralTypeCase.I16 =>
+        expressions.Literal(lit.getI16, ShortType)
+      case proto.Expression.Literal.LiteralTypeCase.I32 => 
expressions.Literal(lit.getI32)
+      case proto.Expression.Literal.LiteralTypeCase.I64 => 
expressions.Literal(lit.getI64)
+      case proto.Expression.Literal.LiteralTypeCase.FP32 =>
+        expressions.Literal(lit.getFp32, FloatType)
+      case proto.Expression.Literal.LiteralTypeCase.FP64 =>
+        expressions.Literal(lit.getFp64, DoubleType)
+      case proto.Expression.Literal.LiteralTypeCase.STRING => 
expressions.Literal(lit.getString)
+      case proto.Expression.Literal.LiteralTypeCase.BINARY =>
+        expressions.Literal(lit.getBinary, BinaryType)
+      // Microseconds since unix epoch.
+      case proto.Expression.Literal.LiteralTypeCase.TIME =>
+        expressions.Literal(lit.getTime, TimestampType)
+      // Days since UNIX epoch.
+      case proto.Expression.Literal.LiteralTypeCase.DATE =>
+        expressions.Literal(lit.getDate, DateType)
+      case _ => throw InvalidPlanInput("Unsupported Literal Type")
+    }
+  }
+
+  private def transformFetch(limit: proto.Fetch): LogicalPlan = {
+    logical.Limit(
+      child = transformRelation(limit.getInput),
+      limitExpr = expressions.Literal(limit.getLimit, IntegerType))
+  }
+
+  private def lookupFunction(name: String, args: Seq[Expression]): Expression 
= {
+    UnresolvedFunction(Seq(name), args, isDistinct = false)
+  }
+
+  private def transformScalarFunction(fun: 
proto.Expression.UnresolvedFunction): Expression = {
+    val funName = fun.getPartsList.asScala.mkString(".")
+    funName match {
+      case "gt" =>
+        expressions.GreaterThan(
+          transformExpression(fun.getArguments(0)),
+          transformExpression(fun.getArguments(1)))
+      case "eq" =>
+        expressions.EqualTo(
+          transformExpression(fun.getArguments(0)),
+          transformExpression(fun.getArguments(1)))
+      case _ =>
+        lookupFunction(funName, 
fun.getArgumentsList.asScala.map(transformExpression).toSeq)
+    }
+  }
+
+  private def transformUnion(u: proto.Union): LogicalPlan = {
+    assert(u.getInputsCount == 2, "Union must have 2 inputs")
+    val plan = logical.Union(transformRelation(u.getInputs(0)), 
transformRelation(u.getInputs(1)))
+
+    u.getUnionType match {
+      case proto.Union.UnionType.UNION_TYPE_DISTINCT => logical.Distinct(plan)
+      case proto.Union.UnionType.UNION_TYPE_ALL => plan
+      case _ =>
+        throw InvalidPlanInput(s"Unsupported set operation 
${u.getUnionTypeValue}")
+    }
+  }
+
+  private def transformJoin(rel: proto.Join): LogicalPlan = {
+    assert(rel.hasLeft && rel.hasRight, "Both join sides must be present")
+    logical.Join(
+      left = transformRelation(rel.getLeft),
+      right = transformRelation(rel.getRight),
+      // TODO

Review Comment:
   Ditto for a JIRA



##########
connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{expressions, plans}
+import org.apache.spark.sql.catalyst.analysis.{
+  UnresolvedAlias,
+  UnresolvedAttribute,
+  UnresolvedFunction,
+  UnresolvedRelation,
+  UnresolvedStar
+}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.types.{
+  BinaryType,
+  ByteType,
+  DateType,
+  DoubleType,
+  FloatType,
+  IntegerType,
+  ShortType,
+  TimestampType
+}
+
+final case class InvalidPlanInput(
+    private val message: String = "",
+    private val cause: Throwable = None.orNull)
+    extends Exception(message, cause)
+
+@Experimental
+@Since("3.3.1")
+class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) {
+
+  def transform(): LogicalPlan = {
+    transformRelation(plan)
+  }
+
+  // The root of the query plan is a relation and we apply the transformations 
to it.
+  private def transformRelation(rel: proto.Relation): LogicalPlan = {
+    val common = if (rel.hasCommon) {
+      Some(rel.getCommon)
+    } else {
+      None
+    }
+
+    rel.getRelTypeCase match {
+      case proto.Relation.RelTypeCase.READ => transformReadRel(rel.getRead, 
common)
+      case proto.Relation.RelTypeCase.PROJECT => 
transformProject(rel.getProject, common)
+      case proto.Relation.RelTypeCase.FILTER => transformFilter(rel.getFilter)
+      case proto.Relation.RelTypeCase.FETCH => transformFetch(rel.getFetch)
+      case proto.Relation.RelTypeCase.JOIN => transformJoin(rel.getJoin)
+      case proto.Relation.RelTypeCase.UNION => transformUnion(rel.getUnion)
+      case proto.Relation.RelTypeCase.SORT => transformSort(rel.getSort)
+      case proto.Relation.RelTypeCase.AGGREGATE => 
transformAggregate(rel.getAggregate)
+      case proto.Relation.RelTypeCase.SQL => transformSql(rel.getSql)
+      case proto.Relation.RelTypeCase.RELTYPE_NOT_SET =>
+        throw new IndexOutOfBoundsException("Expected Relation to be set, but 
is empty.")
+      case _ => throw InvalidPlanInput(s"${rel.getUnknown} not supported.")
+    }
+  }
+
+  private def transformSql(sql: proto.SQL): LogicalPlan = {
+    session.sessionState.sqlParser.parsePlan(sql.getQuery)
+  }
+
+  private def transformReadRel(
+      rel: proto.Read,
+      common: Option[proto.RelationCommon]): LogicalPlan = {
+    val baseRelation = rel.getReadTypeCase match {
+      case proto.Read.ReadTypeCase.NAMED_TABLE =>
+        val child = 
UnresolvedRelation(rel.getNamedTable.getPartsList.asScala.toSeq)
+        if (common.nonEmpty && common.get.getAlias.nonEmpty) {
+          SubqueryAlias(identifier = common.get.getAlias, child = child)
+        } else {
+          child
+        }
+      case _ => throw InvalidPlanInput()
+    }
+    baseRelation
+  }
+
+  private def transformFilter(rel: proto.Filter): LogicalPlan = {
+    assert(rel.hasInput)
+    val baseRel = transformRelation(rel.getInput)
+    logical.Filter(condition = transformExpression(rel.getCondition), child = 
baseRel)
+  }
+
+  private def transformProject(
+      rel: proto.Project,
+      common: Option[proto.RelationCommon]): LogicalPlan = {
+    val baseRel = transformRelation(rel.getInput)
+    val projection = if (rel.getExpressionsCount == 0) {
+      Seq(UnresolvedStar(Option.empty))
+    } else {
+      
rel.getExpressionsList.asScala.map(transformExpression).map(UnresolvedAlias(_))
+    }
+    val project = logical.Project(projectList = projection.toSeq, child = 
baseRel)
+    if (common.nonEmpty && common.get.getAlias.nonEmpty) {
+      logical.SubqueryAlias(identifier = common.get.getAlias, child = project)
+    } else {
+      project
+    }
+  }
+
+  private def transformUnresolvedExpression(exp: proto.Expression): 
UnresolvedAttribute = {
+    UnresolvedAttribute(exp.getUnresolvedAttribute.getPartsList.asScala.toSeq)
+  }
+
+  private def transformExpression(exp: proto.Expression): Expression = {
+    exp.getExprTypeCase match {
+      case proto.Expression.ExprTypeCase.LITERAL => 
transformLiteral(exp.getLiteral)
+      case proto.Expression.ExprTypeCase.UNRESOLVED_ATTRIBUTE =>
+        transformUnresolvedExpression(exp)
+      case proto.Expression.ExprTypeCase.UNRESOLVED_FUNCTION =>
+        transformScalarFunction(exp.getUnresolvedFunction)
+      case _ => throw InvalidPlanInput()
+    }
+  }
+
+  /**
+   * Transforms the protocol buffers literal into the appropriate Catalyst 
literal expression.
+   *
+   * TODO: Missing support for Instant, BigDecimal, LocalDate, LocalTimestamp, 
Duration, Period.

Review Comment:
   Would also better have a JIRA.



##########
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService Implementation.
+ *
+ * This class implements the service stub from the generated code of GRPC.
+ *
+ * @param debug
+ *   delegates debug behavior to the handlers.
+ */
+@Experimental
+@Since("3.3.1")
+class SparkConnectService(debug: Boolean)
+    extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
+    with Logging {
+
+  /**
+   * This is the main entry method for Spark Connect and all calls to execute 
a plan.
+   *
+   * The plan execution is delegated to the [[SparkConnectStreamHandler]]. All 
error handling
+   * should be directly implemented in the deferred implementation. But this 
method catches
+   * generic errors.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def executePlan(request: Request, responseObserver: 
StreamObserver[Response]): Unit = {
+    try {
+      new SparkConnectStreamHandler(responseObserver).handle(request)
+    } catch {
+      case e: Exception =>
+        log.error("Error executing plan.", e)
+        responseObserver.onError(
+          
Status.UNKNOWN.withCause(e).withDescription(e.getLocalizedMessage).asRuntimeException())
+    }
+  }
+
+  /**
+   * Analyze a plan provide metadata and debugging information.
+   *
+   * This method is called to generate the explain plan for a SparkConnect 
plan. In its simplest
+   * implementation, the plan that is generated by the [[SparkConnectPlanner]] 
is used to build a
+   * [[Dataset]] and derive the explain string from the query execution 
details.
+   *
+   * Errors during planning are returned via the [[StreamObserver]] interface.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def analyzePlan(
+      request: Request,
+      responseObserver: StreamObserver[AnalyzeResponse]): Unit = {
+    try {
+      val session =
+        
SparkConnectService.getOrCreateIsolatedSession(request.getUserContext.getUserId).session
+
+      val logicalPlan = request.getPlan.getOpTypeCase match {
+        case proto.Plan.OpTypeCase.ROOT =>
+          new SparkConnectPlanner(request.getPlan.getRoot, session).transform()
+        case _ =>
+          responseObserver.onError(
+            new UnsupportedOperationException(
+              s"${request.getPlan.getOpTypeCase} not supported for analysis."))
+          return
+      }
+      val ds = Dataset.ofRows(session, logicalPlan)
+      val explainString = ds.queryExecution.explainString(ExtendedMode)
+
+      val resp = proto.AnalyzeResponse
+        .newBuilder()
+        .setExplainString(explainString)
+        .setClientId(request.getClientId)
+
+      resp.addAllColumnTypes(ds.schema.fields.map(_.dataType.sql).toSeq.asJava)
+      resp.addAllColumnNames(ds.schema.fields.map(_.name).toSeq.asJava)
+      responseObserver.onNext(resp.build())
+      responseObserver.onCompleted()
+    } catch {
+      case e: Exception =>
+        log.error("Error analyzing plan.", e)
+        responseObserver.onError(
+          
Status.UNKNOWN.withCause(e).withDescription(e.getLocalizedMessage).asRuntimeException())
+    }
+  }
+}
+
+/**
+ * Trivial object used for referring to SparkSessions in the SessionCache.
+ *
+ * @param userId
+ * @param session
+ */
+@Experimental
+case class SessionHolder(userId: String, session: SparkSession) {}

Review Comment:
   ```suggestion
   case class SessionHolder(userId: String, session: SparkSession)
   ```



##########
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.ByteString
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{Request, Response}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.connect.command.SparkConnectCommandPlanner
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.{
+  AdaptiveSparkPlanExec,
+  AdaptiveSparkPlanHelper,
+  QueryStageExec
+}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.util.ArrowUtils
+
+@Experimental
+@Since("3.3.1")
+class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) 
extends Logging {
+
+  def handle(v: Request): Unit = {
+    // Preconditions.checkState(v.userContext.nonEmpty, "User Context must be 
present")

Review Comment:
   Should probably remove this



##########
dev/tox.ini:
##########
@@ -51,4 +51,6 @@ exclude =
     python/pyspark/worker.pyi,
     python/pyspark/java_gateway.pyi,
     dev/ansible-for-test-node/*,
+    python/pyspark/sql/connect/proto/*,
+    python/venv/*,

Review Comment:
   why do we need this?



##########
project/SparkBuild.scala:
##########
@@ -1031,12 +1105,13 @@ object Unidoc {
                       Seq (
     publish := {},
 
+

Review Comment:
   ```suggestion
   ```



##########
python/pyspark/sql/connect/data_frame.py:
##########
@@ -0,0 +1,241 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import (
+    Any,
+    Dict,
+    List,
+    Optional,
+    Sequence,
+    Tuple,
+    Union,
+    cast,
+    TYPE_CHECKING,
+)
+
+import pyspark.sql.connect.plan as plan
+from pyspark.sql.connect.column import (
+    ColumnOrString,
+    ColumnRef,
+    Expression,
+    ExpressionOrString,
+    LiteralExpression,
+)
+
+if TYPE_CHECKING:
+    from pyspark.sql.connect.client import RemoteSparkSession
+
+
+ColumnOrName = Union[ColumnRef, str]
+
+
+class GroupingFrame(object):
+
+    MeasuresType = Union[Sequence[Tuple[ExpressionOrString, str]], Dict[str, 
str]]
+    OptMeasuresType = Optional[MeasuresType]
+
+    def __init__(self, df: "DataFrame", *grouping_cols: Union[ColumnRef, str]) 
-> None:
+        self._df = df
+        self._grouping_cols = [x if isinstance(x, ColumnRef) else df[x] for x 
in grouping_cols]
+
+    def agg(self, exprs: MeasuresType = None) -> "DataFrame":
+
+        # Normalize the dictionary into a list of tuples.
+        if isinstance(exprs, Dict):
+            measures = list(exprs.items())
+        elif isinstance(exprs, List):
+            measures = exprs
+        else:
+            measures = []
+
+        res = DataFrame.withPlan(
+            plan.Aggregate(
+                child=self._df._plan,
+                grouping_cols=self._grouping_cols,
+                measures=measures,
+            ),
+            session=self._df._session,
+        )
+        return res
+
+    def _map_cols_to_dict(self, fun: str, cols: List[Union[ColumnRef, str]]) 
-> Dict[str, str]:
+        return {x if isinstance(x, str) else cast(ColumnRef, x).name(): fun 
for x in cols}
+
+    def min(self, *cols: Union[ColumnRef, str]) -> "DataFrame":
+        expr = self._map_cols_to_dict("min", list(cols))
+        return self.agg(expr)
+
+    def max(self, *cols: Union[ColumnRef, str]) -> "DataFrame":
+        expr = self._map_cols_to_dict("max", list(cols))
+        return self.agg(expr)
+
+    def sum(self, *cols: Union[ColumnRef, str]) -> "DataFrame":
+        expr = self._map_cols_to_dict("sum", list(cols))
+        return self.agg(expr)
+
+    def count(self) -> "DataFrame":
+        return self.agg([(LiteralExpression(1), "count")])
+
+
+class DataFrame(object):
+    """Every DataFrame object essentially is a Relation that is refined using 
the
+    member functions. Calling a method on a dataframe will essentially return 
a copy
+    of the DataFrame with the changes applied.
+    """
+
+    def __init__(self, data: List[Any] = None, schema: List[str] = None):
+        """Creates a new data frame"""
+        self._schema = schema
+        self._plan: Optional[plan.LogicalPlan] = None
+        self._cache: Dict[str, Any] = {}
+        self._session: "RemoteSparkSession" = None
+
+    @classmethod
+    def withPlan(cls, plan: plan.LogicalPlan, session=None) -> "DataFrame":
+        """Main initialization method used to construct a new data frame with 
a child plan."""
+        new_frame = DataFrame()
+        new_frame._plan = plan
+        new_frame._session = session
+        return new_frame
+
+    def select(self, *cols: ColumnRef) -> "DataFrame":
+        return DataFrame.withPlan(plan.Project(self._plan, *cols), 
session=self._session)
+
+    def agg(self, exprs: Dict[str, str]) -> "DataFrame":
+        return self.groupBy().agg(exprs)
+
+    def alias(self, alias):
+        return DataFrame.withPlan(plan.Project(self._plan).withAlias(alias), 
session=self._session)
+
+    def approxQuantile(self, col, probabilities, relativeError):
+        ...
+
+    def colRegex(self, regex) -> "DataFrame":
+        # TODO needs analysis to pick the right column

Review Comment:
   JIRA



##########
python/pyspark/sql/connect/data_frame.py:
##########
@@ -0,0 +1,241 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import (
+    Any,
+    Dict,
+    List,
+    Optional,
+    Sequence,
+    Tuple,
+    Union,
+    cast,
+    TYPE_CHECKING,
+)
+
+import pyspark.sql.connect.plan as plan
+from pyspark.sql.connect.column import (
+    ColumnOrString,
+    ColumnRef,
+    Expression,
+    ExpressionOrString,
+    LiteralExpression,
+)
+
+if TYPE_CHECKING:
+    from pyspark.sql.connect.client import RemoteSparkSession
+
+
+ColumnOrName = Union[ColumnRef, str]
+
+
+class GroupingFrame(object):
+
+    MeasuresType = Union[Sequence[Tuple[ExpressionOrString, str]], Dict[str, 
str]]
+    OptMeasuresType = Optional[MeasuresType]
+
+    def __init__(self, df: "DataFrame", *grouping_cols: Union[ColumnRef, str]) 
-> None:
+        self._df = df
+        self._grouping_cols = [x if isinstance(x, ColumnRef) else df[x] for x 
in grouping_cols]
+
+    def agg(self, exprs: MeasuresType = None) -> "DataFrame":
+
+        # Normalize the dictionary into a list of tuples.
+        if isinstance(exprs, Dict):
+            measures = list(exprs.items())
+        elif isinstance(exprs, List):
+            measures = exprs
+        else:
+            measures = []
+
+        res = DataFrame.withPlan(
+            plan.Aggregate(
+                child=self._df._plan,
+                grouping_cols=self._grouping_cols,
+                measures=measures,
+            ),
+            session=self._df._session,
+        )
+        return res
+
+    def _map_cols_to_dict(self, fun: str, cols: List[Union[ColumnRef, str]]) 
-> Dict[str, str]:
+        return {x if isinstance(x, str) else cast(ColumnRef, x).name(): fun 
for x in cols}
+
+    def min(self, *cols: Union[ColumnRef, str]) -> "DataFrame":
+        expr = self._map_cols_to_dict("min", list(cols))
+        return self.agg(expr)
+
+    def max(self, *cols: Union[ColumnRef, str]) -> "DataFrame":
+        expr = self._map_cols_to_dict("max", list(cols))
+        return self.agg(expr)
+
+    def sum(self, *cols: Union[ColumnRef, str]) -> "DataFrame":
+        expr = self._map_cols_to_dict("sum", list(cols))
+        return self.agg(expr)
+
+    def count(self) -> "DataFrame":
+        return self.agg([(LiteralExpression(1), "count")])
+
+
+class DataFrame(object):
+    """Every DataFrame object essentially is a Relation that is refined using 
the
+    member functions. Calling a method on a dataframe will essentially return 
a copy
+    of the DataFrame with the changes applied.
+    """
+
+    def __init__(self, data: List[Any] = None, schema: List[str] = None):
+        """Creates a new data frame"""
+        self._schema = schema
+        self._plan: Optional[plan.LogicalPlan] = None
+        self._cache: Dict[str, Any] = {}
+        self._session: "RemoteSparkSession" = None
+
+    @classmethod
+    def withPlan(cls, plan: plan.LogicalPlan, session=None) -> "DataFrame":
+        """Main initialization method used to construct a new data frame with 
a child plan."""
+        new_frame = DataFrame()
+        new_frame._plan = plan
+        new_frame._session = session
+        return new_frame
+
+    def select(self, *cols: ColumnRef) -> "DataFrame":
+        return DataFrame.withPlan(plan.Project(self._plan, *cols), 
session=self._session)
+
+    def agg(self, exprs: Dict[str, str]) -> "DataFrame":
+        return self.groupBy().agg(exprs)
+
+    def alias(self, alias):
+        return DataFrame.withPlan(plan.Project(self._plan).withAlias(alias), 
session=self._session)
+
+    def approxQuantile(self, col, probabilities, relativeError):
+        ...
+
+    def colRegex(self, regex) -> "DataFrame":
+        # TODO needs analysis to pick the right column
+        ...
+
+    @property
+    def columns(self) -> List[str]:
+        """Returns the list of columns of the current data frame."""
+        if self._plan is None:
+            return []
+        if "columns" not in self._cache and self._plan is not None:
+            pdd = self.limit(0).collect()
+            # Translate to standard pytho array
+            self._cache["columns"] = pdd.columns.values
+        return self._cache["columns"]
+
+    def count(self):
+        """Returns the number of rows in the data frame"""
+        return self.agg([(LiteralExpression(1), "count")]).collect().iloc[0, 0]
+
+    def crossJoin(self, other):
+        ...
+
+    def coalesce(self, num_partitions: int) -> "DataFrame":
+        # TODO needs repartition operator for substrait
+        ...
+
+    def describe(self, cols):
+        # TODO needs analyze to filter out the right columns

Review Comment:
   JIRA



##########
python/pyspark/sql/connect/plan.py:
##########
@@ -0,0 +1,468 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import (
+    List,
+    Optional,
+    Sequence,
+    Tuple,
+    Union,
+    cast,
+    TYPE_CHECKING,
+)
+
+import pyspark.sql.connect.proto as proto
+from pyspark.sql.connect.column import (
+    ColumnOrString,
+    ColumnRef,
+    Expression,
+    ExpressionOrString,
+    SortOrder,
+)
+
+
+if TYPE_CHECKING:
+    from pyspark.sql.connect.client import RemoteSparkSession
+
+
+class InputValidationError(Exception):
+    pass
+
+
+class LogicalPlan(object):
+
+    INDENT = 2
+
+    def __init__(self, child: Optional["LogicalPlan"]) -> None:
+        self._child = child
+
+    def unresolved_attr(self, *colNames: str) -> proto.Expression:
+        """Creates an unresolved attribute from a column name."""
+        exp = proto.Expression()
+        exp.unresolved_attribute.parts.extend(list(colNames))
+        return exp
+
+    def to_attr_or_expression(
+        self, col: ColumnOrString, session: "RemoteSparkSession"
+    ) -> proto.Expression:
+        """Returns either an instance of an unresolved attribute or the 
serialized
+        expression value of the column."""
+        if type(col) is str:
+            return self.unresolved_attr(cast(str, col))
+        else:
+            return cast(ColumnRef, col).to_plan(session)
+
+    def plan(self, session: "RemoteSparkSession") -> proto.Relation:
+        ...
+
+    def _verify(self, session: "RemoteSparkSession") -> bool:
+        """This method is used to verify that the current logical plan
+        can be serialized to Proto and back and afterwards is identical."""
+        plan = proto.Plan()
+        plan.root.CopyFrom(self.plan(session))
+
+        serialized_plan = plan.SerializeToString()
+        test_plan = proto.Plan()
+        test_plan.ParseFromString(serialized_plan)
+
+        return test_plan == plan
+
+    # TODO(martin.grund) explain , schema
+    def collect(self, session: "RemoteSparkSession" = None, debug: bool = 
False):
+        plan = proto.Plan()
+        plan.root.CopyFrom(self.plan(session))
+
+        if debug:
+            print(plan)
+
+        return plan
+
+    def _i(self, indent) -> str:
+        return " " * indent
+
+    def print(self, indent=0) -> str:
+        ...
+
+    def _repr_html_(self):
+        ...
+
+
+class Read(LogicalPlan):
+    def __init__(self, table_name: str) -> None:
+        super().__init__(None)
+        self.table_name = table_name
+
+    def plan(self, session: "RemoteSparkSession") -> proto.Relation:
+        plan = proto.Relation()
+        plan.read.named_table.parts.extend(self.table_name.split("."))
+        return plan
+
+    def print(self, indent=0) -> str:
+        return f"{self._i(indent)}<Read table_name={self.table_name}>\n"
+
+    def _repr_html_(self):
+        return f"""
+        <ul>
+            <li>
+                <b>Read</b><br />
+                table name: {self.table_name}
+            </li>
+        </ul>
+        """
+
+
+class Project(LogicalPlan):
+    """Logical plan object for a projection.
+
+    All input arguments are directly serialized into the corresponding 
protocol buffer
+    objects. This class only provides very limited error handling and input 
validation.
+
+    To be compatible with PySpark, we validate that the input arguments are all
+    expressions to be able to serialize them to the server.
+
+    """
+
+    def __init__(self, child: Optional["LogicalPlan"], *columns: 
ExpressionOrString) -> None:
+        super().__init__(child)
+        self._raw_columns = list(columns)
+        self.alias = None
+        self._verify_expressions()
+
+    def _verify_expressions(self):
+        """Ensures that all input arguments are instances of Expression."""
+        for c in self._raw_columns:
+            if not isinstance(c, Expression):
+                raise InputValidationError(f"Only Expressions can be used for 
projections: '{c}'.")
+
+    def withAlias(self, alias) -> LogicalPlan:
+        self.alias = alias
+        return self
+
+    def plan(self, session: "RemoteSparkSession") -> proto.Relation:
+        assert self._child is not None
+        proj_exprs = [
+            c.to_plan(session)
+            if isinstance(c, Expression)
+            else self.unresolved_attr(*cast(str, c).split("."))
+            for c in self._raw_columns
+        ]  # [self.unresolved_attr(*x) for x in self.columns]

Review Comment:
   ```suggestion
           ]
   ```
   
   Maybe remove unused commented codes.



##########
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService Implementation.
+ *
+ * This class implements the service stub from the generated code of GRPC.
+ *
+ * @param debug
+ *   delegates debug behavior to the handlers.
+ */
+@Experimental
+@Since("3.3.1")

Review Comment:
   ditto 3.4.0



##########
connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{expressions, plans}
+import org.apache.spark.sql.catalyst.analysis.{
+  UnresolvedAlias,
+  UnresolvedAttribute,
+  UnresolvedFunction,
+  UnresolvedRelation,
+  UnresolvedStar
+}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.types.{
+  BinaryType,
+  ByteType,
+  DateType,
+  DoubleType,
+  FloatType,
+  IntegerType,
+  ShortType,
+  TimestampType
+}
+
+final case class InvalidPlanInput(
+    private val message: String = "",
+    private val cause: Throwable = None.orNull)
+    extends Exception(message, cause)
+
+@Experimental
+@Since("3.3.1")

Review Comment:
   ditto, 3.4.0



##########
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService Implementation.

Review Comment:
   ```suggestion
    * The SparkConnectService implementation.
   ```



##########
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService Implementation.
+ *
+ * This class implements the service stub from the generated code of GRPC.
+ *
+ * @param debug
+ *   delegates debug behavior to the handlers.
+ */
+@Experimental
+@Since("3.3.1")
+class SparkConnectService(debug: Boolean)
+    extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
+    with Logging {
+
+  /**
+   * This is the main entry method for Spark Connect and all calls to execute 
a plan.
+   *
+   * The plan execution is delegated to the [[SparkConnectStreamHandler]]. All 
error handling
+   * should be directly implemented in the deferred implementation. But this 
method catches
+   * generic errors.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def executePlan(request: Request, responseObserver: 
StreamObserver[Response]): Unit = {
+    try {
+      new SparkConnectStreamHandler(responseObserver).handle(request)
+    } catch {
+      case e: Exception =>
+        log.error("Error executing plan.", e)
+        responseObserver.onError(
+          
Status.UNKNOWN.withCause(e).withDescription(e.getLocalizedMessage).asRuntimeException())
+    }
+  }
+
+  /**
+   * Analyze a plan provide metadata and debugging information.
+   *
+   * This method is called to generate the explain plan for a SparkConnect 
plan. In its simplest
+   * implementation, the plan that is generated by the [[SparkConnectPlanner]] 
is used to build a
+   * [[Dataset]] and derive the explain string from the query execution 
details.
+   *
+   * Errors during planning are returned via the [[StreamObserver]] interface.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def analyzePlan(
+      request: Request,
+      responseObserver: StreamObserver[AnalyzeResponse]): Unit = {
+    try {
+      val session =
+        
SparkConnectService.getOrCreateIsolatedSession(request.getUserContext.getUserId).session
+
+      val logicalPlan = request.getPlan.getOpTypeCase match {
+        case proto.Plan.OpTypeCase.ROOT =>
+          new SparkConnectPlanner(request.getPlan.getRoot, session).transform()
+        case _ =>
+          responseObserver.onError(
+            new UnsupportedOperationException(
+              s"${request.getPlan.getOpTypeCase} not supported for analysis."))
+          return
+      }
+      val ds = Dataset.ofRows(session, logicalPlan)
+      val explainString = ds.queryExecution.explainString(ExtendedMode)
+
+      val resp = proto.AnalyzeResponse
+        .newBuilder()
+        .setExplainString(explainString)
+        .setClientId(request.getClientId)
+
+      resp.addAllColumnTypes(ds.schema.fields.map(_.dataType.sql).toSeq.asJava)
+      resp.addAllColumnNames(ds.schema.fields.map(_.name).toSeq.asJava)
+      responseObserver.onNext(resp.build())
+      responseObserver.onCompleted()
+    } catch {
+      case e: Exception =>
+        log.error("Error analyzing plan.", e)
+        responseObserver.onError(
+          
Status.UNKNOWN.withCause(e).withDescription(e.getLocalizedMessage).asRuntimeException())
+    }
+  }
+}
+
+/**
+ * Trivial object used for referring to SparkSessions in the SessionCache.
+ *
+ * @param userId
+ * @param session
+ */
+@Experimental

Review Comment:
   I think we should actually use `Unstable` instead of `Experimental`.



##########
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService Implementation.
+ *
+ * This class implements the service stub from the generated code of GRPC.
+ *
+ * @param debug
+ *   delegates debug behavior to the handlers.
+ */
+@Experimental
+@Since("3.3.1")
+class SparkConnectService(debug: Boolean)
+    extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
+    with Logging {
+
+  /**
+   * This is the main entry method for Spark Connect and all calls to execute 
a plan.
+   *
+   * The plan execution is delegated to the [[SparkConnectStreamHandler]]. All 
error handling
+   * should be directly implemented in the deferred implementation. But this 
method catches
+   * generic errors.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def executePlan(request: Request, responseObserver: 
StreamObserver[Response]): Unit = {
+    try {
+      new SparkConnectStreamHandler(responseObserver).handle(request)
+    } catch {
+      case e: Exception =>
+        log.error("Error executing plan.", e)
+        responseObserver.onError(
+          
Status.UNKNOWN.withCause(e).withDescription(e.getLocalizedMessage).asRuntimeException())
+    }
+  }
+
+  /**
+   * Analyze a plan provide metadata and debugging information.
+   *
+   * This method is called to generate the explain plan for a SparkConnect 
plan. In its simplest
+   * implementation, the plan that is generated by the [[SparkConnectPlanner]] 
is used to build a
+   * [[Dataset]] and derive the explain string from the query execution 
details.
+   *
+   * Errors during planning are returned via the [[StreamObserver]] interface.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def analyzePlan(
+      request: Request,
+      responseObserver: StreamObserver[AnalyzeResponse]): Unit = {
+    try {
+      val session =
+        
SparkConnectService.getOrCreateIsolatedSession(request.getUserContext.getUserId).session
+
+      val logicalPlan = request.getPlan.getOpTypeCase match {
+        case proto.Plan.OpTypeCase.ROOT =>
+          new SparkConnectPlanner(request.getPlan.getRoot, session).transform()
+        case _ =>
+          responseObserver.onError(
+            new UnsupportedOperationException(
+              s"${request.getPlan.getOpTypeCase} not supported for analysis."))
+          return
+      }
+      val ds = Dataset.ofRows(session, logicalPlan)
+      val explainString = ds.queryExecution.explainString(ExtendedMode)
+
+      val resp = proto.AnalyzeResponse
+        .newBuilder()
+        .setExplainString(explainString)
+        .setClientId(request.getClientId)
+
+      resp.addAllColumnTypes(ds.schema.fields.map(_.dataType.sql).toSeq.asJava)
+      resp.addAllColumnNames(ds.schema.fields.map(_.name).toSeq.asJava)
+      responseObserver.onNext(resp.build())
+      responseObserver.onCompleted()
+    } catch {
+      case e: Exception =>
+        log.error("Error analyzing plan.", e)
+        responseObserver.onError(
+          
Status.UNKNOWN.withCause(e).withDescription(e.getLocalizedMessage).asRuntimeException())
+    }
+  }
+}
+
+/**
+ * Trivial object used for referring to SparkSessions in the SessionCache.
+ *
+ * @param userId
+ * @param session
+ */
+@Experimental
+case class SessionHolder(userId: String, session: SparkSession) {}
+
+/**
+ * Satic instance of the SparkConnectService.
+ *
+ * Used to start the overall SparkConnect service and provides global state to 
manage the
+ * different SparkSession from different users connecting to the cluster.
+ */
+@Experimental
+object SparkConnectService {
+
+  // Type alias for the SessionCacheKey. Right now this is a String but allows 
us to switch to a
+  // different or complex type easily.
+  type SessionCacheKey = String;
+
+  var server: Server = _
+
+  private val userSessionMapping =
+    cacheBuilder(100, 3600).build[SessionCacheKey, SessionHolder]()
+
+  // Simple builder for creating the cache of Sessions.
+  private def cacheBuilder(cacheSize: Int, timeoutSeconds: Int): 
CacheBuilder[Object, Object] = {
+    var cacheBuilder = CacheBuilder.newBuilder().ticker(Ticker.systemTicker())
+    if (cacheSize >= 0) {
+      cacheBuilder = cacheBuilder.maximumSize(cacheSize)
+    }
+    if (timeoutSeconds >= 0) {
+      cacheBuilder.expireAfterAccess(timeoutSeconds, TimeUnit.SECONDS)
+    }
+    cacheBuilder
+  }
+
+  /**
+   * Based on the `key` find or create a new SparkSession.
+   */
+  def getOrCreateIsolatedSession(key: SessionCacheKey): SessionHolder = {
+    userSessionMapping.get(
+      key,
+      () => {
+        SessionHolder(key, newIsolatedSession())
+      })
+  }
+
+  private def newIsolatedSession(): SparkSession = {
+    SparkSession.active.newSession()
+  }
+
+  /**
+   * Starts the GRPC Serivce.
+   *
+   * TODO(martin.grund) Make port number configurable.

Review Comment:
   Ditto, it should better have a JIRA 
   ```suggestion
      * TODO(SPARK-XXXXX): Make port number configurable.
   ```



##########
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService Implementation.
+ *
+ * This class implements the service stub from the generated code of GRPC.
+ *
+ * @param debug
+ *   delegates debug behavior to the handlers.
+ */
+@Experimental
+@Since("3.3.1")
+class SparkConnectService(debug: Boolean)
+    extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
+    with Logging {
+
+  /**
+   * This is the main entry method for Spark Connect and all calls to execute 
a plan.
+   *
+   * The plan execution is delegated to the [[SparkConnectStreamHandler]]. All 
error handling
+   * should be directly implemented in the deferred implementation. But this 
method catches
+   * generic errors.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def executePlan(request: Request, responseObserver: 
StreamObserver[Response]): Unit = {
+    try {
+      new SparkConnectStreamHandler(responseObserver).handle(request)
+    } catch {
+      case e: Exception =>
+        log.error("Error executing plan.", e)
+        responseObserver.onError(
+          
Status.UNKNOWN.withCause(e).withDescription(e.getLocalizedMessage).asRuntimeException())
+    }
+  }
+
+  /**
+   * Analyze a plan provide metadata and debugging information.
+   *
+   * This method is called to generate the explain plan for a SparkConnect 
plan. In its simplest
+   * implementation, the plan that is generated by the [[SparkConnectPlanner]] 
is used to build a
+   * [[Dataset]] and derive the explain string from the query execution 
details.
+   *
+   * Errors during planning are returned via the [[StreamObserver]] interface.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def analyzePlan(
+      request: Request,
+      responseObserver: StreamObserver[AnalyzeResponse]): Unit = {
+    try {
+      val session =
+        
SparkConnectService.getOrCreateIsolatedSession(request.getUserContext.getUserId).session
+
+      val logicalPlan = request.getPlan.getOpTypeCase match {
+        case proto.Plan.OpTypeCase.ROOT =>
+          new SparkConnectPlanner(request.getPlan.getRoot, session).transform()
+        case _ =>
+          responseObserver.onError(
+            new UnsupportedOperationException(
+              s"${request.getPlan.getOpTypeCase} not supported for analysis."))
+          return
+      }
+      val ds = Dataset.ofRows(session, logicalPlan)
+      val explainString = ds.queryExecution.explainString(ExtendedMode)
+
+      val resp = proto.AnalyzeResponse
+        .newBuilder()
+        .setExplainString(explainString)
+        .setClientId(request.getClientId)
+
+      resp.addAllColumnTypes(ds.schema.fields.map(_.dataType.sql).toSeq.asJava)
+      resp.addAllColumnNames(ds.schema.fields.map(_.name).toSeq.asJava)
+      responseObserver.onNext(resp.build())
+      responseObserver.onCompleted()
+    } catch {
+      case e: Exception =>

Review Comment:
   ```suggestion
         case e: Throwable =>
   ```



##########
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService Implementation.
+ *
+ * This class implements the service stub from the generated code of GRPC.
+ *
+ * @param debug
+ *   delegates debug behavior to the handlers.
+ */
+@Experimental
+@Since("3.3.1")
+class SparkConnectService(debug: Boolean)
+    extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
+    with Logging {
+
+  /**
+   * This is the main entry method for Spark Connect and all calls to execute 
a plan.
+   *
+   * The plan execution is delegated to the [[SparkConnectStreamHandler]]. All 
error handling
+   * should be directly implemented in the deferred implementation. But this 
method catches
+   * generic errors.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def executePlan(request: Request, responseObserver: 
StreamObserver[Response]): Unit = {
+    try {
+      new SparkConnectStreamHandler(responseObserver).handle(request)
+    } catch {
+      case e: Exception =>

Review Comment:
   Maybe ..
   
   ```suggestion
         case e: Throwable =>
   ```
   
   



##########
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService Implementation.
+ *
+ * This class implements the service stub from the generated code of GRPC.
+ *
+ * @param debug
+ *   delegates debug behavior to the handlers.
+ */
+@Experimental
+@Since("3.3.1")
+class SparkConnectService(debug: Boolean)
+    extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
+    with Logging {
+
+  /**
+   * This is the main entry method for Spark Connect and all calls to execute 
a plan.
+   *
+   * The plan execution is delegated to the [[SparkConnectStreamHandler]]. All 
error handling
+   * should be directly implemented in the deferred implementation. But this 
method catches
+   * generic errors.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def executePlan(request: Request, responseObserver: 
StreamObserver[Response]): Unit = {
+    try {
+      new SparkConnectStreamHandler(responseObserver).handle(request)
+    } catch {
+      case e: Exception =>
+        log.error("Error executing plan.", e)
+        responseObserver.onError(
+          
Status.UNKNOWN.withCause(e).withDescription(e.getLocalizedMessage).asRuntimeException())
+    }
+  }
+
+  /**
+   * Analyze a plan provide metadata and debugging information.
+   *
+   * This method is called to generate the explain plan for a SparkConnect 
plan. In its simplest
+   * implementation, the plan that is generated by the [[SparkConnectPlanner]] 
is used to build a
+   * [[Dataset]] and derive the explain string from the query execution 
details.
+   *
+   * Errors during planning are returned via the [[StreamObserver]] interface.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def analyzePlan(
+      request: Request,
+      responseObserver: StreamObserver[AnalyzeResponse]): Unit = {
+    try {
+      val session =
+        
SparkConnectService.getOrCreateIsolatedSession(request.getUserContext.getUserId).session
+
+      val logicalPlan = request.getPlan.getOpTypeCase match {
+        case proto.Plan.OpTypeCase.ROOT =>
+          new SparkConnectPlanner(request.getPlan.getRoot, session).transform()
+        case _ =>
+          responseObserver.onError(
+            new UnsupportedOperationException(
+              s"${request.getPlan.getOpTypeCase} not supported for analysis."))
+          return
+      }
+      val ds = Dataset.ofRows(session, logicalPlan)
+      val explainString = ds.queryExecution.explainString(ExtendedMode)
+
+      val resp = proto.AnalyzeResponse
+        .newBuilder()
+        .setExplainString(explainString)
+        .setClientId(request.getClientId)
+
+      resp.addAllColumnTypes(ds.schema.fields.map(_.dataType.sql).toSeq.asJava)
+      resp.addAllColumnNames(ds.schema.fields.map(_.name).toSeq.asJava)
+      responseObserver.onNext(resp.build())
+      responseObserver.onCompleted()
+    } catch {
+      case e: Exception =>
+        log.error("Error analyzing plan.", e)
+        responseObserver.onError(
+          
Status.UNKNOWN.withCause(e).withDescription(e.getLocalizedMessage).asRuntimeException())
+    }
+  }
+}
+
+/**
+ * Trivial object used for referring to SparkSessions in the SessionCache.
+ *
+ * @param userId
+ * @param session
+ */
+@Experimental
+case class SessionHolder(userId: String, session: SparkSession) {}
+
+/**
+ * Satic instance of the SparkConnectService.
+ *
+ * Used to start the overall SparkConnect service and provides global state to 
manage the
+ * different SparkSession from different users connecting to the cluster.
+ */
+@Experimental

Review Comment:
   Ditto, `Unstable`



##########
python/pyspark/sql/connect/plan.py:
##########
@@ -0,0 +1,468 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import (
+    List,
+    Optional,
+    Sequence,
+    Tuple,
+    Union,
+    cast,
+    TYPE_CHECKING,
+)
+
+import pyspark.sql.connect.proto as proto
+from pyspark.sql.connect.column import (
+    ColumnOrString,
+    ColumnRef,
+    Expression,
+    ExpressionOrString,
+    SortOrder,
+)
+
+
+if TYPE_CHECKING:
+    from pyspark.sql.connect.client import RemoteSparkSession
+
+
+class InputValidationError(Exception):
+    pass
+
+
+class LogicalPlan(object):
+
+    INDENT = 2
+
+    def __init__(self, child: Optional["LogicalPlan"]) -> None:
+        self._child = child
+
+    def unresolved_attr(self, *colNames: str) -> proto.Expression:
+        """Creates an unresolved attribute from a column name."""
+        exp = proto.Expression()
+        exp.unresolved_attribute.parts.extend(list(colNames))
+        return exp
+
+    def to_attr_or_expression(
+        self, col: ColumnOrString, session: "RemoteSparkSession"
+    ) -> proto.Expression:
+        """Returns either an instance of an unresolved attribute or the 
serialized
+        expression value of the column."""
+        if type(col) is str:
+            return self.unresolved_attr(cast(str, col))
+        else:
+            return cast(ColumnRef, col).to_plan(session)
+
+    def plan(self, session: "RemoteSparkSession") -> proto.Relation:
+        ...
+
+    def _verify(self, session: "RemoteSparkSession") -> bool:
+        """This method is used to verify that the current logical plan
+        can be serialized to Proto and back and afterwards is identical."""
+        plan = proto.Plan()
+        plan.root.CopyFrom(self.plan(session))
+
+        serialized_plan = plan.SerializeToString()
+        test_plan = proto.Plan()
+        test_plan.ParseFromString(serialized_plan)
+
+        return test_plan == plan
+
+    # TODO(martin.grund) explain , schema
+    def collect(self, session: "RemoteSparkSession" = None, debug: bool = 
False):
+        plan = proto.Plan()
+        plan.root.CopyFrom(self.plan(session))
+
+        if debug:
+            print(plan)
+
+        return plan
+
+    def _i(self, indent) -> str:

Review Comment:
   What is this? The name `_i` seems difficult to follow.



##########
connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.command
+
+import scala.collection.JavaConverters._
+
+import com.google.common.collect.{Lists, Maps}
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
+import org.apache.spark.sql.types.StringType
+
+@Experimental
+@Since("3.3.1")
+class SparkConnectCommandPlanner(session: SparkSession, command: 
proto.Command) {
+
+  lazy val pythonVersion =
+    sys.env.getOrElse("PYSPARK_PYTHON", 
sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python3"))
+
+  def process(): Unit = {
+    command.getCommandTypeCase match {
+      case proto.Command.CommandTypeCase.CREATE_FUNCTION =>
+        handleCreateScalarFunction(command.getCreateFunction)
+      case _ => throw new UnsupportedOperationException(s"${command} not 
supported.")
+    }
+  }
+
+  // This is a helper function that registers a new Python function in the
+  // [[SparkSession]].
+  def handleCreateScalarFunction(cf: proto.CreateScalarFunction): Unit = {
+    val function = SimplePythonFunction(
+      cf.getSerializedFunction.toByteArray,
+      Maps.newHashMap(),
+      Lists.newArrayList(),
+      pythonVersion,
+      "3.9", // TODO This needs to be an actual version.

Review Comment:
   Would be better to file a JIRA. e.g.)
   
   ```suggestion
         "3.9", // TODO(SPARK-XXXX): This needs to be an actual version.
   ```



##########
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Ticker
+import com.google.common.cache.CacheBuilder
+import io.grpc.{Server, Status}
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{
+  AnalyzeResponse,
+  Request,
+  Response,
+  SparkConnectServiceGrpc
+}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.ExtendedMode
+
+/**
+ * The SparkConnectService Implementation.
+ *
+ * This class implements the service stub from the generated code of GRPC.
+ *
+ * @param debug
+ *   delegates debug behavior to the handlers.
+ */
+@Experimental
+@Since("3.3.1")
+class SparkConnectService(debug: Boolean)
+    extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
+    with Logging {
+
+  /**
+   * This is the main entry method for Spark Connect and all calls to execute 
a plan.
+   *
+   * The plan execution is delegated to the [[SparkConnectStreamHandler]]. All 
error handling
+   * should be directly implemented in the deferred implementation. But this 
method catches
+   * generic errors.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def executePlan(request: Request, responseObserver: 
StreamObserver[Response]): Unit = {
+    try {
+      new SparkConnectStreamHandler(responseObserver).handle(request)
+    } catch {
+      case e: Exception =>
+        log.error("Error executing plan.", e)
+        responseObserver.onError(
+          
Status.UNKNOWN.withCause(e).withDescription(e.getLocalizedMessage).asRuntimeException())
+    }
+  }
+
+  /**
+   * Analyze a plan provide metadata and debugging information.
+   *
+   * This method is called to generate the explain plan for a SparkConnect 
plan. In its simplest
+   * implementation, the plan that is generated by the [[SparkConnectPlanner]] 
is used to build a
+   * [[Dataset]] and derive the explain string from the query execution 
details.
+   *
+   * Errors during planning are returned via the [[StreamObserver]] interface.
+   *
+   * @param request
+   * @param responseObserver
+   */
+  override def analyzePlan(
+      request: Request,
+      responseObserver: StreamObserver[AnalyzeResponse]): Unit = {
+    try {
+      val session =
+        
SparkConnectService.getOrCreateIsolatedSession(request.getUserContext.getUserId).session
+
+      val logicalPlan = request.getPlan.getOpTypeCase match {
+        case proto.Plan.OpTypeCase.ROOT =>
+          new SparkConnectPlanner(request.getPlan.getRoot, session).transform()
+        case _ =>
+          responseObserver.onError(
+            new UnsupportedOperationException(
+              s"${request.getPlan.getOpTypeCase} not supported for analysis."))
+          return
+      }
+      val ds = Dataset.ofRows(session, logicalPlan)
+      val explainString = ds.queryExecution.explainString(ExtendedMode)
+
+      val resp = proto.AnalyzeResponse
+        .newBuilder()
+        .setExplainString(explainString)
+        .setClientId(request.getClientId)
+
+      resp.addAllColumnTypes(ds.schema.fields.map(_.dataType.sql).toSeq.asJava)
+      resp.addAllColumnNames(ds.schema.fields.map(_.name).toSeq.asJava)
+      responseObserver.onNext(resp.build())
+      responseObserver.onCompleted()
+    } catch {
+      case e: Exception =>
+        log.error("Error analyzing plan.", e)
+        responseObserver.onError(
+          
Status.UNKNOWN.withCause(e).withDescription(e.getLocalizedMessage).asRuntimeException())
+    }
+  }
+}
+
+/**
+ * Trivial object used for referring to SparkSessions in the SessionCache.
+ *
+ * @param userId
+ * @param session
+ */
+@Experimental
+case class SessionHolder(userId: String, session: SparkSession) {}
+
+/**
+ * Satic instance of the SparkConnectService.
+ *
+ * Used to start the overall SparkConnect service and provides global state to 
manage the
+ * different SparkSession from different users connecting to the cluster.
+ */
+@Experimental
+object SparkConnectService {
+
+  // Type alias for the SessionCacheKey. Right now this is a String but allows 
us to switch to a
+  // different or complex type easily.
+  type SessionCacheKey = String;
+
+  var server: Server = _
+
+  private val userSessionMapping =
+    cacheBuilder(100, 3600).build[SessionCacheKey, SessionHolder]()
+
+  // Simple builder for creating the cache of Sessions.
+  private def cacheBuilder(cacheSize: Int, timeoutSeconds: Int): 
CacheBuilder[Object, Object] = {
+    var cacheBuilder = CacheBuilder.newBuilder().ticker(Ticker.systemTicker())
+    if (cacheSize >= 0) {
+      cacheBuilder = cacheBuilder.maximumSize(cacheSize)
+    }
+    if (timeoutSeconds >= 0) {
+      cacheBuilder.expireAfterAccess(timeoutSeconds, TimeUnit.SECONDS)
+    }
+    cacheBuilder
+  }
+
+  /**
+   * Based on the `key` find or create a new SparkSession.
+   */
+  def getOrCreateIsolatedSession(key: SessionCacheKey): SessionHolder = {
+    userSessionMapping.get(
+      key,
+      () => {
+        SessionHolder(key, newIsolatedSession())
+      })
+  }
+
+  private def newIsolatedSession(): SparkSession = {
+    SparkSession.active.newSession()
+  }
+
+  /**
+   * Starts the GRPC Serivce.
+   *
+   * TODO(martin.grund) Make port number configurable.
+   */
+  def startGRPCService(): Unit = {
+    val debugMode = 
SparkEnv.get.conf.getBoolean("spark.connect.grpc.debug.enabled", true)
+    val port = 15002
+    val sb = NettyServerBuilder
+      .forPort(port)
+      .addService(new SparkConnectService(debugMode))
+
+    // If debug mode is configured, load the ProtoReflection service so that 
tools like
+    // grpcurl can introspect the API for debugging.
+    if (debugMode) {
+      sb.addService(ProtoReflectionService.newInstance())
+    }
+    server = sb.build
+    server.start()
+  }
+
+  // Starts the service
+  def start(): Unit = {
+    startGRPCService()
+  }
+
+  def stop(): Unit = {
+    if (server != null) {
+      server.shutdownNow()
+    }
+  }
+}
+
+/**
+ * This is the main entry point for Spark Connect.
+ *
+ * To decouple the build of Spark Connect and it's dependencies from the core 
of Spark, we
+ * implement it as a Driver Plugin. To enable Spark Connect, simply make sure 
that the appropriate
+ * JAR is available in the CLASSPATH and the driver plugin is configured to 
load this class.
+ */
+@Experimental

Review Comment:
   `Unstable`



##########
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.ByteString
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{Request, Response}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.connect.command.SparkConnectCommandPlanner
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.{
+  AdaptiveSparkPlanExec,
+  AdaptiveSparkPlanHelper,
+  QueryStageExec
+}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.util.ArrowUtils
+
+@Experimental
+@Since("3.3.1")

Review Comment:
   ```suggestion
   @Unstable
   @Since("3.4.0")
   ```



##########
connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.ByteString
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{Request, Response}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.connect.command.SparkConnectCommandPlanner
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.{
+  AdaptiveSparkPlanExec,
+  AdaptiveSparkPlanHelper,
+  QueryStageExec
+}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.util.ArrowUtils
+
+@Experimental
+@Since("3.3.1")
+class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) 
extends Logging {
+
+  def handle(v: Request): Unit = {
+    // Preconditions.checkState(v.userContext.nonEmpty, "User Context must be 
present")
+    val session =
+      
SparkConnectService.getOrCreateIsolatedSession(v.getUserContext.getUserId).session
+    v.getPlan.getOpTypeCase match {
+      case proto.Plan.OpTypeCase.COMMAND => handleCommand(session, v)
+      case proto.Plan.OpTypeCase.ROOT => handlePlan(session, v)
+      case _ =>
+        throw new UnsupportedOperationException(s"${v.getPlan.getOpTypeCase} 
not supported.")
+    }
+  }
+
+  def handlePlan(session: SparkSession, request: proto.Request): Unit = {
+    // Extract the plan from the request and convert it to a logical plan
+    val planner = new SparkConnectPlanner(request.getPlan.getRoot, session)
+    val rows =
+      Dataset.ofRows(session, planner.transform())
+    processRows(request.getClientId, rows)
+  }
+
+  private def processRows(clientId: String, rows: DataFrame) = {
+    val timeZoneId = SQLConf.get.sessionLocalTimeZone
+    val schema =
+      ByteString.copyFrom(ArrowUtils.toArrowSchema(rows.schema, 
timeZoneId).toByteArray)
+
+    val textSchema = rows.schema.fields.map(f => f.name).mkString("|")
+
+    // TODO empty results (except limit 0) will not yield a schema.

Review Comment:
   ditto for filing a JIRA



##########
python/pyspark/sql/connect/plan.py:
##########
@@ -0,0 +1,468 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import (
+    List,
+    Optional,
+    Sequence,
+    Tuple,
+    Union,
+    cast,
+    TYPE_CHECKING,
+)
+
+import pyspark.sql.connect.proto as proto
+from pyspark.sql.connect.column import (
+    ColumnOrString,
+    ColumnRef,
+    Expression,
+    ExpressionOrString,
+    SortOrder,
+)
+
+
+if TYPE_CHECKING:
+    from pyspark.sql.connect.client import RemoteSparkSession
+
+
+class InputValidationError(Exception):
+    pass
+
+
+class LogicalPlan(object):
+
+    INDENT = 2
+
+    def __init__(self, child: Optional["LogicalPlan"]) -> None:
+        self._child = child
+
+    def unresolved_attr(self, *colNames: str) -> proto.Expression:
+        """Creates an unresolved attribute from a column name."""
+        exp = proto.Expression()
+        exp.unresolved_attribute.parts.extend(list(colNames))
+        return exp
+
+    def to_attr_or_expression(
+        self, col: ColumnOrString, session: "RemoteSparkSession"
+    ) -> proto.Expression:
+        """Returns either an instance of an unresolved attribute or the 
serialized
+        expression value of the column."""
+        if type(col) is str:
+            return self.unresolved_attr(cast(str, col))
+        else:
+            return cast(ColumnRef, col).to_plan(session)
+
+    def plan(self, session: "RemoteSparkSession") -> proto.Relation:
+        ...
+
+    def _verify(self, session: "RemoteSparkSession") -> bool:
+        """This method is used to verify that the current logical plan
+        can be serialized to Proto and back and afterwards is identical."""
+        plan = proto.Plan()
+        plan.root.CopyFrom(self.plan(session))
+
+        serialized_plan = plan.SerializeToString()
+        test_plan = proto.Plan()
+        test_plan.ParseFromString(serialized_plan)
+
+        return test_plan == plan
+
+    # TODO(martin.grund) explain , schema
+    def collect(self, session: "RemoteSparkSession" = None, debug: bool = 
False):
+        plan = proto.Plan()
+        plan.root.CopyFrom(self.plan(session))
+
+        if debug:
+            print(plan)
+
+        return plan
+
+    def _i(self, indent) -> str:
+        return " " * indent
+
+    def print(self, indent=0) -> str:
+        ...
+
+    def _repr_html_(self):
+        ...
+
+
+class Read(LogicalPlan):
+    def __init__(self, table_name: str) -> None:
+        super().__init__(None)
+        self.table_name = table_name
+
+    def plan(self, session: "RemoteSparkSession") -> proto.Relation:
+        plan = proto.Relation()
+        plan.read.named_table.parts.extend(self.table_name.split("."))
+        return plan
+
+    def print(self, indent=0) -> str:
+        return f"{self._i(indent)}<Read table_name={self.table_name}>\n"
+
+    def _repr_html_(self):
+        return f"""
+        <ul>
+            <li>
+                <b>Read</b><br />
+                table name: {self.table_name}
+            </li>
+        </ul>
+        """
+
+
+class Project(LogicalPlan):
+    """Logical plan object for a projection.
+
+    All input arguments are directly serialized into the corresponding 
protocol buffer
+    objects. This class only provides very limited error handling and input 
validation.
+
+    To be compatible with PySpark, we validate that the input arguments are all
+    expressions to be able to serialize them to the server.
+
+    """
+
+    def __init__(self, child: Optional["LogicalPlan"], *columns: 
ExpressionOrString) -> None:
+        super().__init__(child)
+        self._raw_columns = list(columns)
+        self.alias = None
+        self._verify_expressions()
+
+    def _verify_expressions(self):
+        """Ensures that all input arguments are instances of Expression."""
+        for c in self._raw_columns:
+            if not isinstance(c, Expression):
+                raise InputValidationError(f"Only Expressions can be used for 
projections: '{c}'.")
+
+    def withAlias(self, alias) -> LogicalPlan:
+        self.alias = alias
+        return self
+
+    def plan(self, session: "RemoteSparkSession") -> proto.Relation:
+        assert self._child is not None
+        proj_exprs = [
+            c.to_plan(session)
+            if isinstance(c, Expression)
+            else self.unresolved_attr(*cast(str, c).split("."))
+            for c in self._raw_columns
+        ]  # [self.unresolved_attr(*x) for x in self.columns]
+        common = proto.RelationCommon()
+        if self.alias is not None:
+            common.alias = self.alias
+
+        plan = proto.Relation()
+        plan.project.input.CopyFrom(self._child.plan(session))
+        plan.project.expressions.extend(proj_exprs)
+        plan.common.CopyFrom(common)
+        return plan
+
+    def print(self, indent=0) -> str:
+        c_buf = self._child.print(indent + LogicalPlan.INDENT) if self._child 
else ""
+        return f"{self._i(indent)}<Project cols={self._raw_columns}>\n{c_buf}"
+
+    def _repr_html_(self):
+        return f"""
+        <ul>
+            <li>
+                <b>Project</b><br />
+                Columns: {",".join([str(c) for c in self._raw_columns])}
+                {self._child._repr_html_()}
+            </li>
+        </uL>
+        """
+
+
+class Filter(LogicalPlan):
+    def __init__(self, child: Optional["LogicalPlan"], filter: Expression) -> 
None:
+        super().__init__(child)
+        self.filter = filter
+
+    def plan(self, session: "RemoteSparkSession") -> proto.Relation:
+        assert self._child is not None
+        plan = proto.Relation()
+        plan.filter.input.CopyFrom(self._child.plan(session))
+        plan.filter.condition.CopyFrom(self.filter.to_plan(session))
+        return plan
+
+    def print(self, indent=0) -> str:
+        c_buf = self._child.print(indent + LogicalPlan.INDENT) if self._child 
else ""
+        return f"{self._i(indent)}<Filter filter={self.filter}>\n{c_buf}"
+
+    def _repr_html_(self):
+        return f"""
+        <ul>
+            <li>
+                <b>Filter</b><br />
+                Condition: {self.filter}
+                {self._child._repr_html_()}
+            </li>
+        </uL>
+        """
+
+
+class Limit(LogicalPlan):
+    def __init__(self, child: Optional["LogicalPlan"], limit: int, offset: int 
= 0) -> None:
+        super().__init__(child)
+        self.limit = limit
+        self.offset = offset
+
+    def plan(self, session: "RemoteSparkSession") -> proto.Relation:
+        assert self._child is not None
+        plan = proto.Relation()
+        plan.fetch.input.CopyFrom(self._child.plan(session))
+        plan.fetch.limit = self.limit
+        return plan
+
+    def print(self, indent=0) -> str:
+        c_buf = self._child.print(indent + LogicalPlan.INDENT) if self._child 
else ""
+        return f"{self._i(indent)}<Limit limit={self.limit} 
offset={self.offset}>\n{c_buf}"
+
+    def _repr_html_(self):
+        return f"""
+        <ul>
+            <li>
+                <b>Limit</b><br />
+                Limit: {self.limit} <br />
+                Offset: {self.offset} <br />
+                {self._child._repr_html_()}
+            </li>
+        </uL>
+        """
+
+
+class Sort(LogicalPlan):
+    def __init__(
+        self, child: Optional["LogicalPlan"], *columns: Union[SortOrder, 
ColumnRef, str]
+    ) -> None:
+        super().__init__(child)
+        self.columns = list(columns)
+
+    def col_to_sort_field(
+        self, col: Union[SortOrder, ColumnRef, str], session: 
"RemoteSparkSession"
+    ) -> proto.Sort.SortField:
+        if type(col) is SortOrder:
+            so = cast(SortOrder, col)
+            sf = proto.Sort.SortField()
+            sf.expression.CopyFrom(so.ref.to_plan(session))
+            sf.direction = (
+                proto.Sort.SortDirection.SORT_DIRECTION_ASCENDING
+                if so.ascending
+                else proto.Sort.SortDirection.SORT_DIRECTION_DESCENDING
+            )
+            sf.nulls = (
+                proto.Sort.SortNulls.SORT_NULLS_FIRST
+                if not so.nullsLast
+                else proto.Sort.SortNulls.SORT_NULLS_LAST
+            )
+            return sf
+        else:
+            sf = proto.Sort.SortField()
+            # Check string
+            if type(col) is ColumnRef:
+                sf.expression.CopyFrom(cast(ColumnRef, col).to_plan(session))
+            else:
+                sf.expression.CopyFrom(self.unresolved_attr(cast(str, col)))
+            sf.direction = proto.Sort.SortDirection.SORT_DIRECTION_ASCENDING
+            sf.nulls = proto.Sort.SortNulls.SORT_NULLS_LAST
+            return sf
+
+    def plan(self, session: "RemoteSparkSession") -> proto.Relation:
+        assert self._child is not None
+        plan = proto.Relation()
+        plan.sort.input.CopyFrom(self._child.plan(session))
+        plan.sort.sort_fields.extend([self.col_to_sort_field(x, session) for x 
in self.columns])
+        return plan
+
+    def print(self, indent=0) -> str:
+        c_buf = self._child.print(indent + LogicalPlan.INDENT) if self._child 
else ""
+        return f"{self._i(indent)}<Sort columns={self.columns}>\n{c_buf}"
+
+    def _repr_html_(self):
+        return f"""
+        <ul>
+            <li>
+                <b>Sort</b><br />
+                {", ".join([str(c) for c in self.columns])}
+                {self._child._repr_html_()}
+            </li>
+        </uL>
+        """
+
+
+class Aggregate(LogicalPlan):
+    MeasuresType = Sequence[Tuple[ExpressionOrString, str]]
+    OptMeasuresType = Optional[MeasuresType]
+
+    def __init__(
+        self,
+        child: Optional["LogicalPlan"],
+        grouping_cols: List[ColumnRef],
+        measures: OptMeasuresType,
+    ) -> None:
+        super().__init__(child)
+        self.grouping_cols = grouping_cols
+        self.measures = measures if measures is not None else []
+
+    def _convert_measure(self, m, session: "RemoteSparkSession"):
+        exp, fun = m
+        measure = proto.Aggregate.Measure()
+        measure.function.name = fun
+        if type(exp) is str:
+            measure.function.arguments.append(self.unresolved_attr(exp))
+        else:
+            measure.function.arguments.append(cast(Expression, 
exp).to_plan(session))
+        return measure
+
+    def plan(self, session: "RemoteSparkSession") -> proto.Relation:
+        assert self._child is not None
+        groupings = [x.to_plan(session) for x in self.grouping_cols]
+
+        agg = proto.Relation()
+        agg.aggregate.input.CopyFrom(self._child.plan(session))
+        agg.aggregate.measures.extend(
+            list(map(lambda x: self._convert_measure(x, session), 
self.measures))
+        )
+
+        gs = proto.Aggregate.GroupingSet()
+        gs.aggregate_expressions.extend(groupings)
+        agg.aggregate.grouping_sets.append(gs)
+        return agg
+
+    def print(self, indent=0) -> str:
+        c_buf = self._child.print(indent + LogicalPlan.INDENT) if self._child 
else ""
+        return (
+            f"{self._i(indent)}<Sort columns={self.grouping_cols}"
+            f"measures={self.measures}>\n{c_buf}"
+        )
+
+    def _repr_html_(self):
+        return f"""
+        <ul>
+            <li>
+                <b>Aggregation</b><br />
+                {self._child._repr_html_()}
+            </li>
+        </uL>
+        """
+
+
+class Join(LogicalPlan):
+    def __init__(
+        self,
+        left: Optional["LogicalPlan"],
+        right: "LogicalPlan",
+        on: ColumnOrString,
+        how: proto.Join.JoinType = proto.Join.JoinType.JOIN_TYPE_INNER,
+    ) -> None:
+        super().__init__(left)
+        self.left = cast(LogicalPlan, left)
+        self.right = right
+        self.on = on
+        if how is None:
+            how = proto.Join.JoinType.JOIN_TYPE_INNER
+        self.how = how
+
+    def plan(self, session: "RemoteSparkSession") -> proto.Relation:
+        rel = proto.Relation()
+        rel.join.left.CopyFrom(self.left.plan(session))
+        rel.join.right.CopyFrom(self.right.plan(session))
+        rel.join.on.CopyFrom(self.to_attr_or_expression(self.on, session))
+        return rel
+
+    def print(self, indent=0) -> str:
+        i = self._i(indent)
+        o = self._i(indent + LogicalPlan.INDENT)
+        n = indent + LogicalPlan.INDENT * 2
+        return (
+            f"{i}<Join on={self.on} how={self.how}>\n{o}"
+            f"left=\n{self.left.print(n)}\n{o}right=\n{self.right.print(n)}"
+        )
+
+    def _repr_html_(self):
+        return f"""
+        <ul>
+            <li>
+                <b>Join</b><br />
+                Left: {self.left._repr_html_()}
+                Right: {self.right._repr_html_()}
+            </li>
+        </uL>
+        """
+
+
+class UnionAll(LogicalPlan):
+    def __init__(self, child: Optional["LogicalPlan"], other: "LogicalPlan") 
-> None:
+        super().__init__(child)
+        self.other = other
+
+    def plan(self, session: "RemoteSparkSession") -> proto.Relation:
+        assert self._child is not None
+        rel = proto.Relation()
+        rel.union.inputs.extend([self._child.plan(session), 
self.other.plan(session)])
+        rel.union.union_type = proto.Union.UnionType.UNION_TYPE_ALL
+
+    def print(self, indent=0) -> str:
+        assert self._child is not None
+        assert self.other is not None
+
+        i = self._i(indent)
+        o = self._i(indent + LogicalPlan.INDENT)
+        n = indent + LogicalPlan.INDENT * 2
+        return (
+            f"{i}UnionAll\n{o}child1=\n{self._child.print(n)}"
+            f"\n{o}child2=\n{self.other.print(n)}"
+        )
+
+    def _repr_html_(self) -> str:
+        assert self._child is not None
+        assert self.other is not None
+
+        return f"""
+        <ul>
+            <li>
+                <b>Union</b><br />
+                Left: {self._child._repr_html_()}
+                Right: {self.other._repr_html_()}
+            </li>
+        </uL>
+        """
+
+
+class Sql(LogicalPlan):

Review Comment:
   Maybe `SQL`



##########
dev/infra/Dockerfile:
##########
@@ -65,3 +65,6 @@ RUN Rscript -e "devtools::install_version('roxygen2', 
version='7.2.0', repos='ht
 
 # See more in SPARK-39735
 ENV R_LIBS_SITE 
"/usr/local/lib/R/site-library:${R_LIBS_SITE}:/usr/lib/R/library"
+
+# Add Python Deps for Spark Connect.

Review Comment:
   ```suggestion
   # Add Python deps for Spark Connect.
   ```



##########
dev/tox.ini:
##########
@@ -51,4 +51,6 @@ exclude =
     python/pyspark/worker.pyi,
     python/pyspark/java_gateway.pyi,
     dev/ansible-for-test-node/*,
+    python/pyspark/sql/connect/proto/*,

Review Comment:
   Should add a TODO with a JIRA (enabling it)



##########
project/SparkBuild.scala:
##########
@@ -593,6 +608,60 @@ object Core {
   )
 }
 
+
+object SparkConnect {
+
+  import BuildCommons.protoVersion
+
+  private val shadePrefix = "org.sparkproject.connect"
+  val shadeJar = taskKey[Unit]("Shade the Jars")
+
+  lazy val settings = Seq(
+    // Setting version for the protobuf compiler. This has to be propagated to 
every sub-project
+    // even if the project is not using it.
+    PB.protocVersion := BuildCommons.protoVersion,
+
+    // For some reason the resolution from the imported Maven build does not 
work for some
+    // of these dependendencies that we need to shade later on.
+    libraryDependencies ++= Seq(
+      "io.grpc"          % "protoc-gen-grpc-java" % BuildCommons.gprcVersion 
asProtocPlugin(),
+      "org.scala-lang" % "scala-library" % "2.12.16" % "provided",
+      "com.google.guava" % "guava"                % "31.0.1-jre",

Review Comment:
   Would better to have the indentation same as others in this file.



##########
python/mypy.ini:
##########
@@ -23,6 +23,16 @@ show_error_codes = True
 warn_unused_ignores = True
 warn_redundant_casts = True
 
+[mypy-pyspark.sql.connect.*]

Review Comment:
   Would need a JIRA for this too.



##########
python/pyspark/sql/connect/README.md:
##########
@@ -0,0 +1,34 @@
+
+# [EXPERIMENTAL] Spark Connect
+
+**Spark Connect is a strictly experimental feature and under heavy development.
+All APIs should be considered volatile and should not be used in production.**
+
+This module contains the implementation of Spark Connect which is a logical 
plan
+facade for the implementation in Spark. Spark Connect is directly integrated 
into the build
+of Spark. To enable it, you only need to activate the driver plugin for Spark 
Connect.
+
+
+
+
+## Build
+
+1. Build Spark as usual per the documentation.
+2. Build and package the Spark Connect package
+   ```commandline
+   ./build/mvn package

Review Comment:
   Do we have a command for SBT too?



##########
python/pyspark/sql/connect/column.py:
##########
@@ -0,0 +1,181 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import List, Union, cast, get_args, TYPE_CHECKING
+
+import pyspark.sql.connect.proto as proto
+
+PrimitiveType = Union[str, int, bool, float]
+ExpressionOrString = Union[str, "Expression"]
+ColumnOrString = Union[str, "ColumnRef"]
+
+if TYPE_CHECKING:
+    from pyspark.sql.connect.client import RemoteSparkSession
+    import pyspark.sql.connect.proto as proto
+
+
+class Expression(object):
+    """
+    Expression base class.
+    """
+
+    def __init__(self) -> None:  # type: ignore[name-defined]
+        pass
+
+    def to_plan(self, session: "RemoteSparkSession") -> "proto.Expression":  # 
type: ignore
+        ...
+
+    def __str__(self) -> str:
+        ...
+
+
+class LiteralExpression(Expression):
+    """A literal expression.
+
+    The Python types are converted best effort into the relevant proto types. 
On the Spark Connect
+    server side, the proto types are converted to the Catalyst equivalents."""
+
+    def __init__(self, value: PrimitiveType) -> None:  # type: 
ignore[name-defined]
+        super().__init__()
+        self._value = value
+
+    def to_plan(self, session: "RemoteSparkSession") -> "proto.Expression":
+        """Converts the literal expression to the literal in proto.
+
+        TODO This method always assumes the largest type and can thus

Review Comment:
   Would need a JIRA.



##########
python/pyspark/sql/connect/data_frame.py:
##########
@@ -0,0 +1,241 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import (
+    Any,
+    Dict,
+    List,
+    Optional,
+    Sequence,
+    Tuple,
+    Union,
+    cast,
+    TYPE_CHECKING,
+)
+
+import pyspark.sql.connect.plan as plan
+from pyspark.sql.connect.column import (
+    ColumnOrString,
+    ColumnRef,
+    Expression,
+    ExpressionOrString,
+    LiteralExpression,
+)
+
+if TYPE_CHECKING:
+    from pyspark.sql.connect.client import RemoteSparkSession
+
+
+ColumnOrName = Union[ColumnRef, str]
+
+
+class GroupingFrame(object):
+
+    MeasuresType = Union[Sequence[Tuple[ExpressionOrString, str]], Dict[str, 
str]]
+    OptMeasuresType = Optional[MeasuresType]
+
+    def __init__(self, df: "DataFrame", *grouping_cols: Union[ColumnRef, str]) 
-> None:
+        self._df = df
+        self._grouping_cols = [x if isinstance(x, ColumnRef) else df[x] for x 
in grouping_cols]
+
+    def agg(self, exprs: MeasuresType = None) -> "DataFrame":
+
+        # Normalize the dictionary into a list of tuples.
+        if isinstance(exprs, Dict):
+            measures = list(exprs.items())
+        elif isinstance(exprs, List):
+            measures = exprs
+        else:
+            measures = []
+
+        res = DataFrame.withPlan(
+            plan.Aggregate(
+                child=self._df._plan,
+                grouping_cols=self._grouping_cols,
+                measures=measures,
+            ),
+            session=self._df._session,
+        )
+        return res
+
+    def _map_cols_to_dict(self, fun: str, cols: List[Union[ColumnRef, str]]) 
-> Dict[str, str]:
+        return {x if isinstance(x, str) else cast(ColumnRef, x).name(): fun 
for x in cols}
+
+    def min(self, *cols: Union[ColumnRef, str]) -> "DataFrame":
+        expr = self._map_cols_to_dict("min", list(cols))
+        return self.agg(expr)
+
+    def max(self, *cols: Union[ColumnRef, str]) -> "DataFrame":
+        expr = self._map_cols_to_dict("max", list(cols))
+        return self.agg(expr)
+
+    def sum(self, *cols: Union[ColumnRef, str]) -> "DataFrame":
+        expr = self._map_cols_to_dict("sum", list(cols))
+        return self.agg(expr)
+
+    def count(self) -> "DataFrame":
+        return self.agg([(LiteralExpression(1), "count")])
+
+
+class DataFrame(object):
+    """Every DataFrame object essentially is a Relation that is refined using 
the
+    member functions. Calling a method on a dataframe will essentially return 
a copy
+    of the DataFrame with the changes applied.
+    """
+
+    def __init__(self, data: List[Any] = None, schema: List[str] = None):
+        """Creates a new data frame"""
+        self._schema = schema
+        self._plan: Optional[plan.LogicalPlan] = None
+        self._cache: Dict[str, Any] = {}
+        self._session: "RemoteSparkSession" = None
+
+    @classmethod
+    def withPlan(cls, plan: plan.LogicalPlan, session=None) -> "DataFrame":
+        """Main initialization method used to construct a new data frame with 
a child plan."""
+        new_frame = DataFrame()
+        new_frame._plan = plan
+        new_frame._session = session
+        return new_frame
+
+    def select(self, *cols: ColumnRef) -> "DataFrame":
+        return DataFrame.withPlan(plan.Project(self._plan, *cols), 
session=self._session)
+
+    def agg(self, exprs: Dict[str, str]) -> "DataFrame":
+        return self.groupBy().agg(exprs)
+
+    def alias(self, alias):
+        return DataFrame.withPlan(plan.Project(self._plan).withAlias(alias), 
session=self._session)
+
+    def approxQuantile(self, col, probabilities, relativeError):
+        ...
+
+    def colRegex(self, regex) -> "DataFrame":
+        # TODO needs analysis to pick the right column
+        ...
+
+    @property
+    def columns(self) -> List[str]:
+        """Returns the list of columns of the current data frame."""
+        if self._plan is None:
+            return []
+        if "columns" not in self._cache and self._plan is not None:
+            pdd = self.limit(0).collect()
+            # Translate to standard pytho array
+            self._cache["columns"] = pdd.columns.values
+        return self._cache["columns"]
+
+    def count(self):
+        """Returns the number of rows in the data frame"""
+        return self.agg([(LiteralExpression(1), "count")]).collect().iloc[0, 0]
+
+    def crossJoin(self, other):
+        ...
+
+    def coalesce(self, num_partitions: int) -> "DataFrame":
+        # TODO needs repartition operator for substrait

Review Comment:
   JIRA



##########
project/plugins.sbt:
##########
@@ -44,3 +44,5 @@ libraryDependencies += "org.ow2.asm"  % "asm-commons" % "9.3"
 addSbtPlugin("com.simplytyped" % "sbt-antlr4" % "0.8.3")
 
 addSbtPlugin("com.typesafe.sbt" % "sbt-pom-reader" % "2.2.0")
+
+addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.1")

Review Comment:
   What is this for?



##########
python/pyspark/sql/connect/function_builder.py:
##########
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import functools
+from typing import TYPE_CHECKING
+
+import pyspark.sql.types
+from pyspark.sql.connect.column import (
+    ColumnOrString,
+    ColumnRef,
+    Expression,
+    ExpressionOrString,
+    ScalarFunctionExpression,
+)
+
+if TYPE_CHECKING:
+    from pyspark.sql.connect.client import RemoteSparkSession
+
+
+def _build(name: str, *args: ExpressionOrString) -> ScalarFunctionExpression:
+    """
+    Simple wrapper function that converts the arguments into the appropriate 
types.
+    Parameters
+    ----------
+    name Name of the function to be called.
+    args The list of arguments.
+
+    Returns
+    -------
+    :class:`ScalarFunctionExpression`
+    """
+    cols = [x if isinstance(x, Expression) else 
ColumnRef.from_qualified_name(x) for x in args]
+    return ScalarFunctionExpression(name, *cols)
+
+
+class FunctionBuilder:
+    """This class is used to build arbitrary functions used in expressions"""
+
+    def __getattr__(self, name):
+        def _(*args: ExpressionOrString) -> ScalarFunctionExpression:
+            return _build(name, *args)
+
+        _.__doc__ = f"""Function to apply {name}"""
+        return _
+
+
+functions = FunctionBuilder()
+
+
+class UserDefinedFunction(Expression):
+    """A user defied function is an expresison that has a reference to the 
actual
+    Python callable attached. During plan generation, the client sends a 
command to
+    the server to register the UDF before execution. The expression object can 
be
+    reused and is not attached to a specific execution. If the internal name of
+    the temporary function is set, it is assumed that the registration has 
already
+    happened."""
+
+    def __init__(self, func, return_type=pyspark.sql.types.StringType(), 
args=None):
+        super().__init__()
+
+        self._func_ref = func
+        self._return_type = return_type
+        self._args = list(args)
+        self._func_name = None
+
+    def to_plan(self, session: "RemoteSparkSession") -> Expression:
+        # Needs to materialize the UDF to the server
+        # Only do this once per session
+        func_name = session.register_udf(self._func_ref, self._return_type)
+        # Func name is used for the actual reference
+        return _build(func_name, *self._args).to_plan(session)
+
+    def __str__(self):
+        return f"UserDefinedFunction({self._func_name})"
+
+
+def _create_udf(function, return_type):
+    def wrapper(*cols: "ColumnOrString"):
+        return UserDefinedFunction(func=function, return_type=return_type, 
args=cols)
+
+    return wrapper
+
+
+def udf(function, return_type=pyspark.sql.types.StringType()):
+    """
+    Returns a callable that represents the column ones arguments are applied

Review Comment:
   Can we complete the docs or file a JIRA?



##########
project/SparkBuild.scala:
##########
@@ -593,6 +608,60 @@ object Core {
   )
 }
 
+
+object SparkConnect {
+
+  import BuildCommons.protoVersion
+
+  private val shadePrefix = "org.sparkproject.connect"
+  val shadeJar = taskKey[Unit]("Shade the Jars")
+
+  lazy val settings = Seq(
+    // Setting version for the protobuf compiler. This has to be propagated to 
every sub-project
+    // even if the project is not using it.
+    PB.protocVersion := BuildCommons.protoVersion,
+
+    // For some reason the resolution from the imported Maven build does not 
work for some
+    // of these dependendencies that we need to shade later on.
+    libraryDependencies ++= Seq(
+      "io.grpc"          % "protoc-gen-grpc-java" % BuildCommons.gprcVersion 
asProtocPlugin(),
+      "org.scala-lang" % "scala-library" % "2.12.16" % "provided",
+      "com.google.guava" % "guava"                % "31.0.1-jre",
+      "com.google.guava" % "failureaccess"        % "1.0.1",
+      "com.google.protobuf" % "protobuf-java"        % protoVersion % 
"protobuf"
+    ),
+
+    dependencyOverrides ++= Seq(
+      "com.google.guava" % "guava"                % "31.0.1-jre",
+      "com.google.guava" % "failureaccess"        % "1.0.1",
+      "com.google.protobuf" % "protobuf-java"        % protoVersion
+    ),
+
+    (Compile / PB.targets) := Seq(
+      PB.gens.java                -> (Compile / sourceManaged).value,
+      PB.gens.plugin("grpc-java") -> (Compile / sourceManaged).value
+    ),
+
+    (assembly / test) := false,
+
+    (assembly / logLevel) := Level.Info,
+
+    (assembly / assemblyShadeRules) := Seq(
+      ShadeRule.rename("io.grpc.**" -> 
"org.sparkproject.connect.grpc.@0").inAll,
+      ShadeRule.rename("com.google.common.**"-> 
"org.sparkproject.connect.guava.@1").inAll,
+      ShadeRule.rename("com.google.thirdparty.**"-> 
"org.sparkproject.connect.guava.@1").inAll,
+      ShadeRule.rename("com.google.protobuf.**"-> 
"org.sparkproject.connect.protobuf.@1").inAll,

Review Comment:
   ```suggestion
         ShadeRule.rename("com.google.common.**" -> 
"org.sparkproject.connect.guava.@1").inAll,
         ShadeRule.rename("com.google.thirdparty.**" -> 
"org.sparkproject.connect.guava.@1").inAll,
         ShadeRule.rename("com.google.protobuf.**" -> 
"org.sparkproject.connect.protobuf.@1").inAll,
   ```



##########
python/pyspark/sql/connect/functions.py:
##########
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from pyspark.sql.connect.column import ColumnRef, LiteralExpression
+from pyspark.sql.connect.column import PrimitiveType
+
+

Review Comment:
   Can we file a JIRA to complete this functions?



##########
python/pyspark/sql/connect/plan.py:
##########
@@ -0,0 +1,468 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import (
+    List,
+    Optional,
+    Sequence,
+    Tuple,
+    Union,
+    cast,
+    TYPE_CHECKING,
+)
+
+import pyspark.sql.connect.proto as proto
+from pyspark.sql.connect.column import (
+    ColumnOrString,
+    ColumnRef,
+    Expression,
+    ExpressionOrString,
+    SortOrder,
+)
+
+
+if TYPE_CHECKING:
+    from pyspark.sql.connect.client import RemoteSparkSession
+
+
+class InputValidationError(Exception):
+    pass
+
+
+class LogicalPlan(object):
+
+    INDENT = 2
+
+    def __init__(self, child: Optional["LogicalPlan"]) -> None:
+        self._child = child
+
+    def unresolved_attr(self, *colNames: str) -> proto.Expression:
+        """Creates an unresolved attribute from a column name."""
+        exp = proto.Expression()
+        exp.unresolved_attribute.parts.extend(list(colNames))
+        return exp
+
+    def to_attr_or_expression(
+        self, col: ColumnOrString, session: "RemoteSparkSession"
+    ) -> proto.Expression:
+        """Returns either an instance of an unresolved attribute or the 
serialized
+        expression value of the column."""
+        if type(col) is str:
+            return self.unresolved_attr(cast(str, col))
+        else:
+            return cast(ColumnRef, col).to_plan(session)
+
+    def plan(self, session: "RemoteSparkSession") -> proto.Relation:
+        ...
+
+    def _verify(self, session: "RemoteSparkSession") -> bool:
+        """This method is used to verify that the current logical plan
+        can be serialized to Proto and back and afterwards is identical."""
+        plan = proto.Plan()
+        plan.root.CopyFrom(self.plan(session))
+
+        serialized_plan = plan.SerializeToString()
+        test_plan = proto.Plan()
+        test_plan.ParseFromString(serialized_plan)
+
+        return test_plan == plan
+
+    # TODO(martin.grund) explain , schema

Review Comment:
   JIRA



##########
python/pyspark/sql/connect/data_frame.py:
##########
@@ -0,0 +1,241 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import (
+    Any,
+    Dict,
+    List,
+    Optional,
+    Sequence,
+    Tuple,
+    Union,
+    cast,
+    TYPE_CHECKING,
+)
+
+import pyspark.sql.connect.plan as plan
+from pyspark.sql.connect.column import (
+    ColumnOrString,
+    ColumnRef,
+    Expression,
+    ExpressionOrString,
+    LiteralExpression,
+)
+
+if TYPE_CHECKING:
+    from pyspark.sql.connect.client import RemoteSparkSession
+
+
+ColumnOrName = Union[ColumnRef, str]
+
+
+class GroupingFrame(object):
+
+    MeasuresType = Union[Sequence[Tuple[ExpressionOrString, str]], Dict[str, 
str]]
+    OptMeasuresType = Optional[MeasuresType]
+
+    def __init__(self, df: "DataFrame", *grouping_cols: Union[ColumnRef, str]) 
-> None:
+        self._df = df
+        self._grouping_cols = [x if isinstance(x, ColumnRef) else df[x] for x 
in grouping_cols]
+
+    def agg(self, exprs: MeasuresType = None) -> "DataFrame":
+
+        # Normalize the dictionary into a list of tuples.
+        if isinstance(exprs, Dict):
+            measures = list(exprs.items())
+        elif isinstance(exprs, List):
+            measures = exprs
+        else:
+            measures = []
+
+        res = DataFrame.withPlan(
+            plan.Aggregate(
+                child=self._df._plan,
+                grouping_cols=self._grouping_cols,
+                measures=measures,
+            ),
+            session=self._df._session,
+        )
+        return res
+
+    def _map_cols_to_dict(self, fun: str, cols: List[Union[ColumnRef, str]]) 
-> Dict[str, str]:
+        return {x if isinstance(x, str) else cast(ColumnRef, x).name(): fun 
for x in cols}
+
+    def min(self, *cols: Union[ColumnRef, str]) -> "DataFrame":
+        expr = self._map_cols_to_dict("min", list(cols))
+        return self.agg(expr)
+
+    def max(self, *cols: Union[ColumnRef, str]) -> "DataFrame":
+        expr = self._map_cols_to_dict("max", list(cols))
+        return self.agg(expr)
+
+    def sum(self, *cols: Union[ColumnRef, str]) -> "DataFrame":
+        expr = self._map_cols_to_dict("sum", list(cols))
+        return self.agg(expr)
+
+    def count(self) -> "DataFrame":
+        return self.agg([(LiteralExpression(1), "count")])
+
+
+class DataFrame(object):
+    """Every DataFrame object essentially is a Relation that is refined using 
the
+    member functions. Calling a method on a dataframe will essentially return 
a copy
+    of the DataFrame with the changes applied.
+    """
+
+    def __init__(self, data: List[Any] = None, schema: List[str] = None):
+        """Creates a new data frame"""
+        self._schema = schema
+        self._plan: Optional[plan.LogicalPlan] = None
+        self._cache: Dict[str, Any] = {}
+        self._session: "RemoteSparkSession" = None
+
+    @classmethod
+    def withPlan(cls, plan: plan.LogicalPlan, session=None) -> "DataFrame":
+        """Main initialization method used to construct a new data frame with 
a child plan."""
+        new_frame = DataFrame()
+        new_frame._plan = plan
+        new_frame._session = session
+        return new_frame
+
+    def select(self, *cols: ColumnRef) -> "DataFrame":
+        return DataFrame.withPlan(plan.Project(self._plan, *cols), 
session=self._session)
+
+    def agg(self, exprs: Dict[str, str]) -> "DataFrame":
+        return self.groupBy().agg(exprs)
+
+    def alias(self, alias):
+        return DataFrame.withPlan(plan.Project(self._plan).withAlias(alias), 
session=self._session)
+
+    def approxQuantile(self, col, probabilities, relativeError):
+        ...
+
+    def colRegex(self, regex) -> "DataFrame":
+        # TODO needs analysis to pick the right column
+        ...
+
+    @property
+    def columns(self) -> List[str]:
+        """Returns the list of columns of the current data frame."""
+        if self._plan is None:
+            return []
+        if "columns" not in self._cache and self._plan is not None:
+            pdd = self.limit(0).collect()
+            # Translate to standard pytho array
+            self._cache["columns"] = pdd.columns.values
+        return self._cache["columns"]
+
+    def count(self):
+        """Returns the number of rows in the data frame"""
+        return self.agg([(LiteralExpression(1), "count")]).collect().iloc[0, 0]
+
+    def crossJoin(self, other):
+        ...
+
+    def coalesce(self, num_partitions: int) -> "DataFrame":
+        # TODO needs repartition operator for substrait
+        ...
+
+    def describe(self, cols):
+        # TODO needs analyze to filter out the right columns
+        ...
+
+    def distinct(self) -> "DataFrame":
+        """Returns all distinct rows."""
+        all_cols = self.columns()
+        gf = self.groupBy(*all_cols)
+        return gf.agg()
+
+    def drop(self, *cols: ColumnOrString):
+        # TODO Needs analyze to know which columns to drop
+        all_cols = self.columns()
+        dropped = set([c.name() if isinstance(c, ColumnRef) else 
self[c].name() for c in cols])
+        filter(lambda x: x in dropped, all_cols)
+
+    def filter(self, condition: Expression) -> "DataFrame":
+        return DataFrame.withPlan(
+            plan.Filter(child=self._plan, filter=condition), 
session=self._session
+        )
+
+    def first(self):
+        return self.head(1)
+
+    def groupBy(self, *cols: ColumnOrString):
+        return GroupingFrame(self, *cols)
+
+    def head(self, n: int):
+        self.limit(n)
+        return self.collect()
+
+    def join(self, other, on, how=None):
+        return DataFrame.withPlan(
+            plan.Join(left=self._plan, right=other._plan, on=on, how=how),
+            session=self._session,
+        )
+
+    def limit(self, n):
+        return DataFrame.withPlan(plan.Limit(child=self._plan, limit=n), 
session=self._session)
+
+    def sort(self, *cols: ColumnOrName):
+        """Sort by a specific column"""
+        return DataFrame.withPlan(plan.Sort(self._plan, *cols), 
session=self._session)
+
+    def show(self, n: int, truncate: Optional[Union[bool, int]], vertical: 
Optional[bool]):
+        ...
+
+    def union(self, other) -> "DataFrame":
+        return self.unionAll(other)
+
+    def unionAll(self, other: "DataFrame") -> "DataFrame":
+        if other._plan is None:
+            raise ValueError("Argument to Union does not contain a valid 
plan.")
+        return DataFrame.withPlan(plan.UnionAll(self._plan, other._plan), 
session=self._session)
+
+    def where(self, condition):
+        return self.filter(condition)
+
+    def _get_alias(self):
+        p = self._plan
+        while p is not None:
+            if isinstance(p, plan.Project) and p.alias:
+                return p.alias
+            p = p._child
+        return None

Review Comment:
   ```suggestion
   ```



##########
python/pyspark/sql/connect/readwriter.py:
##########
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyspark.sql.connect.data_frame import DataFrame
+from pyspark.sql.connect.plan import Read
+
+
+class DataFrameReader:
+    def __init__(self, client):
+        self._client = client
+
+    def table(self, tableName: str) -> "DataFrame":
+        df = DataFrame.withPlan(Read(tableName), self._client)
+        return df

Review Comment:
   Can we fail a JIRA to complete the API parity?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to