zhengruifeng commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1044220839


##########
python/pyspark/sql/connect/session.py:
##########
@@ -264,9 +294,71 @@ def createDataFrame(self, data: "pd.DataFrame") -> 
"DataFrame":
 
         """
         assert data is not None
-        if len(data) == 0:
+        if isinstance(data, DataFrame):
+            raise TypeError("data is already a DataFrame")
+        if isinstance(data, Sized) and len(data) == 0:
             raise ValueError("Input data cannot be empty")
-        return DataFrame.withPlan(plan.LocalRelation(data), self)
+
+        _schema: Optional[StructType] = None
+        _schema_str: Optional[str] = None
+        _cols: Optional[List[str]] = None
+
+        if isinstance(schema, StructType):
+            _schema = schema
+
+        elif isinstance(schema, str):
+            _schema_str = schema
+
+        elif isinstance(schema, (list, tuple)):
+            # Must re-encode any unicode strings to be consistent with 
StructField names
+            _cols = [x.encode("utf-8") if not isinstance(x, str) else x for x 
in schema]
+
+        # Create the Pandas DataFrame
+        if isinstance(data, pd.DataFrame):
+            pdf = data
+
+        elif isinstance(data, np.ndarray):
+            # `data` of numpy.ndarray type will be converted to a pandas 
DataFrame,
+            if data.ndim not in [1, 2]:
+                raise ValueError("NumPy array input should be of 1 or 2 
dimensions.")
+
+            pdf = pd.DataFrame(data)
+
+            if _cols is None:
+                if data.ndim == 1 or data.shape[1] == 1:
+                    _cols = ["value"]
+                else:
+                    _cols = ["_%s" % i for i in range(1, data.shape[1] + 1)]
+
+        else:
+            pdf = pd.DataFrame(list(data))
+
+            if _cols is None:
+                _cols = ["_%s" % i for i in range(1, pdf.shape[1] + 1)]
+
+        # Validate number of columns
+        num_cols = pdf.shape[1]
+        if _schema is not None and len(_schema.fields) != num_cols:
+            raise ValueError(
+                f"Length mismatch: Expected axis has {num_cols} elements, "
+                f"new values have {len(_schema.fields)} elements"
+            )
+        elif _cols is not None and len(_cols) != num_cols:
+            raise ValueError(
+                f"Length mismatch: Expected axis has {num_cols} elements, "
+                f"new values have {len(_cols)} elements"
+            )
+
+        table = pa.Table.from_pandas(pdf)
+
+        if _schema is not None:
+            return DataFrame.withPlan(LocalRelation(table, schema=_schema), 
self)
+        elif _schema_str is not None:
+            return DataFrame.withPlan(LocalRelation(table, 
schema=_schema_str), self)

Review Comment:
   If we can have a RPC  for `parseTableSchema` in `AnalyzePlan` and implement 
`DataFrame.to`, then we do not need to add `schema` in `LocalRelation`'s proto, 
and simplify here with  `DataFrame.withPlan(LocalRelation(table), 
self).toDF(...).to(...)`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to