SandishKumarHN commented on code in PR #39039:
URL: https://github.com/apache/spark/pull/39039#discussion_r1065079899


##########
docs/sql-data-sources-protobuf.md:
##########
@@ -0,0 +1,302 @@
+---
+layout: global
+title: Protobuf Data Source Guide
+license: |
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+---
+
+* This will become a table of contents (this text will be scraped).
+{:toc}
+
+Since Spark 3.4.0 release, [Spark 
SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) provides 
built-in support for reading and writing protobuf data.
+
+## Deploying
+The `spark-protobuf` module is external and not included in `spark-submit` or 
`spark-shell` by default.
+
+As with any Spark applications, `spark-submit` is used to launch your 
application. `spark-protobuf_{{site.SCALA_BINARY_VERSION}}`
+and its dependencies can be directly added to `spark-submit` using 
`--packages`, such as,
+
+    ./bin/spark-submit --packages 
org.apache.spark:spark-protobuf_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
 ...
+
+For experimenting on `spark-shell`, you can also use `--packages` to add 
`org.apache.spark:spark-protobuf_{{site.SCALA_BINARY_VERSION}}` and its 
dependencies directly,
+
+    ./bin/spark-shell --packages 
org.apache.spark:spark-protobuf_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
 ...
+
+See [Application Submission Guide](submitting-applications.html) for more 
details about submitting applications with external dependencies.
+
+## to_protobuf() and from_protobuf()
+The spark-protobuf package provides function `to_protobuf` to encode a column 
as binary in protobuf
+format, and `from_protobuf()` to decode protobuf binary data into a column. 
Both functions transform one column to
+another column, and the input/output SQL data type can be a complex type or a 
primitive type.
+
+Using protobuf message as columns is useful when reading from or writing to a 
streaming source like Kafka. Each
+Kafka key-value record will be augmented with some metadata, such as the 
ingestion timestamp into Kafka, the offset in Kafka, etc.
+* If the "value" field that contains your data is in protobuf, you could use 
`from_protobuf()` to extract your data, enrich it, clean it, and then push it 
downstream to Kafka again or write it out to a different sink.
+* `to_protobuf()` can be used to turn structs into protobuf message. This 
method is particularly useful when you would like to re-encode multiple columns 
into a single one when writing data out to Kafka.
+
+Spark SQL schema is generated based on the protobuf descriptor file or 
protobuf class passed to `from_protobuf` and `to_protobuf`. The specified 
protobuf class or protobuf descriptor file must match the data, otherwise, the 
behavior is undefined: it may fail or return arbitrary results.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.spark.sql.protobuf.functions._
+
+// `from_protobuf` and `to_protobuf` provides two schema choices. First, via 
the protobuf descriptor file, and then via the protobuf message class name.
+
+val df = spark
+.readStream
+.format("kafka")
+.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+.option("subscribe", "topic1")
+.load()
+
+// 1. Decode the Protobuf data into a struct;
+// 2. Filter by column `favorite_color`;
+// 3. Encode the column `user` in Protobuf format.
+// The Protobuf protoc command can be used to generate a protobuf descriptor 
file for give .proto file.
+val output = df
+.select(from_protobuf($"value", protobufMessageName, descriptorFilePath) as 
$"user")
+.where("user.favorite_color == \"red\"")
+.select(to_protobuf($"user", protobufMessageName, descriptorFilePath) as 
$"value")
+
+val query = output
+.writeStream
+.format("kafka")
+.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+.option("topic", "topic2")
+.start()
+
+// Alternatively, you can decode and encode the SQL columns into protobuf 
format using protobuf class name.
+// The specified Protobuf class must match the data, otherwise the behavior is 
undefined: it may fail or return arbitrary result. The jar containing Java 
class should be shaded. Specifically, `com.google.protobuf.*` should be shaded 
to `org.sparkproject.spark-protobuf.protobuf.*`.
+val output = df
+.select(from_protobuf($"value", "org.spark.protobuf.User") as $"user")
+.where("user.favorite_color == \"red\"")
+.select(to_protobuf($"user", "org.spark.protobuf.User") as $"value")
+
+val query = output
+.writeStream
+.format("kafka")
+.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+.option("topic", "topic2")
+.start()
+
+{% endhighlight %}
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.protobuf.functions.*;
+
+// `from_protobuf` and `to_protobuf` provides two schema choices. First, via 
the protobuf descriptor file, and then via the protobuf message class name.
+
+Dataset<Row> df = spark
+.readStream()
+.format("kafka")
+.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+.option("subscribe", "topic1")
+.load();
+
+// 1. Decode the Protobuf data into a struct;
+// 2. Filter by column `favorite_color`;
+// 3. Encode the column `user` in Protobuf format.
+// The Protobuf protoc command can be used to generate a protobuf descriptor 
file for give .proto file.
+Dataset<Row> output = df
+.select(from_protobuf(col("value"), protobufMessageName, 
descriptorFilePath).as("user"))
+.where("user.favorite_color == \"red\"")
+.select(to_protobuf(col("user"), protobufMessageName, 
descriptorFilePath).as("value"));
+
+// Alternatively, you can decode and encode the SQL columns into protobuf 
format using protobuf class name.
+// The specified Protobuf class must match the data, otherwise the behavior is 
undefined: it may fail or return arbitrary result. The jar containing Java 
class should be shaded. Specifically, `com.google.protobuf.*` should be shaded 
to `org.sparkproject.spark-protobuf.protobuf.*`.
+Dataset<Row> output = df
+.select(from_protobuf(col("value"), "org.spark.protobuf.User").as("user"))
+.where("user.favorite_color == \"red\"")
+.select(to_protobuf(col("user"), "org.spark.protobuf.User").as("value"));
+
+StreamingQuery query = output
+.writeStream()
+.format("kafka")
+.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+.option("topic", "topic2")
+.start();
+
+{% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+from pyspark.sql.protobuf.functions import from_protobuf, to_protobuf
+
+# `from_protobuf` and `to_protobuf` provides two schema choices. First, via 
the protobuf descriptor file, and then via the protobuf message class name.
+
+df = spark\
+.readStream\
+.format("kafka")\
+.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
+.option("subscribe", "topic1")\
+.load()
+
+# 1. Decode the Protobuf data into a struct;
+# 2. Filter by column `favorite_color`;
+# 3. Encode the column `user` in Protobuf format.
+# The Protobuf protoc command can be used to generate a protobuf descriptor 
file for give .proto file.
+output = df\
+.select(from_protobuf("value", protobufMessageName, 
descriptorFilePath).alias("user"))\
+.where('user.favorite_color == "red"')\
+.select(to_protobuf("user", protobufMessageName, 
descriptorFilePath).alias("value"))
+
+# Alternatively, you can decode and encode the SQL columns into protobuf 
format using protobuf class name.
+# The specified Protobuf class must match the data, otherwise the behavior is 
undefined: it may fail or return arbitrary result. The jar containing Java 
class should be shaded. Specifically, `com.google.protobuf.*` should be shaded 
to `org.sparkproject.spark-protobuf.protobuf.*`.
+output = df\
+.select(from_protobuf("value", "org.spark.protobuf.User").alias("user"))\
+.where('user.favorite_color == "red"')\
+.select(to_protobuf("user", "org.spark.protobuf.User").alias("value"))
+
+query = output\
+.writeStream\
+.format("kafka")\
+.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
+.option("topic", "topic2")\
+.start()
+
+{% endhighlight %}
+</div>
+</div>
+
+## Supported types for Protobuf -> Spark SQL conversion
+Currently Spark supports reading [protobuf scalar 
types](https://developers.google.com/protocol-buffers/docs/proto3#scalar), 
[enum types](https://developers.google.com/protocol-buffers/docs/proto3#enum), 
[nested 
type](https://developers.google.com/protocol-buffers/docs/proto3#nested), and 
[maps type](https://developers.google.com/protocol-buffers/docs/proto3#maps) 
under messages of Protobuf.
+One common issue that can arise when working with Protobuf data is the 
presence of circular references. In Protobuf, a circular reference occurs when 
a field refers back to itself or to another field that refers back to the 
original field. This can cause issues when parsing the data, as it can result 
in infinite loops or other unexpected behavior.

Review Comment:
   @rangadi, I have made the suggested changes. Could you please review them 
again? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to