rangadi commented on code in PR #39039: URL: https://github.com/apache/spark/pull/39039#discussion_r1055056543
########## docs/sql-data-sources-protobuf.md: ########## @@ -0,0 +1,302 @@ +--- +layout: global +title: Protobuf Data Source Guide +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +* This will become a table of contents (this text will be scraped). +{:toc} + +Since Spark 3.4.0 release, [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) provides built-in support for reading and writing protobuf data. + +## Deploying +The `spark-protobuf` module is external and not included in `spark-submit` or `spark-shell` by default. + +As with any Spark applications, `spark-submit` is used to launch your application. `spark-protobuf_{{site.SCALA_BINARY_VERSION}}` +and its dependencies can be directly added to `spark-submit` using `--packages`, such as, + + ./bin/spark-submit --packages org.apache.spark:spark-protobuf_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... + +For experimenting on `spark-shell`, you can also use `--packages` to add `org.apache.spark:spark-protobuf_{{site.SCALA_BINARY_VERSION}}` and its dependencies directly, + + ./bin/spark-shell --packages org.apache.spark:spark-protobuf_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... + +See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## to_protobuf() and from_protobuf() +The spark-protobuf package provides function `to_protobuf` to encode a column as binary in protobuf +format, and `from_protobuf()` to decode protobuf binary data into a column. Both functions transform one column to +another column, and the input/output SQL data type can be a complex type or a primitive type. + +Using protobuf message as columns is useful when reading from or writing to a streaming source like Kafka. Each +Kafka key-value record will be augmented with some metadata, such as the ingestion timestamp into Kafka, the offset in Kafka, etc. +* If the "value" field that contains your data is in protobuf, you could use `from_protobuf()` to extract your data, enrich it, clean it, and then push it downstream to Kafka again or write it out to a different sink. +* `to_protobuf()` can be used to turn structs into protobuf message. This method is particularly useful when you would like to re-encode multiple columns into a single one when writing data out to Kafka. + +Spark SQL schema is generated based on the protobuf descriptor file or protobuf class passed to `from_protobuf` and `to_protobuf`. The specified protobuf class or protobuf descriptor file must match the data, otherwise, the behavior is undefined: it may fail or return arbitrary results. + +<div class="codetabs"> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +import org.apache.spark.sql.protobuf.functions._ Review Comment: One suggestion for these examples: For users it is hard to get an idea of what the protobuf looks like and how corresponding spark schema looks like. Could you include a sample protobuf (it could be simple) in a comment and corresponding spark schema (as output of df.printSchema()). This gives good idea. An example I used in databricks notebook: <img width="1020" alt="image" src="https://user-images.githubusercontent.com/502522/209049901-8341dfd5-523d-439b-94a3-fa3e3d1175b9.png"> It is ok if this makes the example look large. That is fine. No one complains about a good example that tells them more. <img width="859" alt="image" src="https://user-images.githubusercontent.com/502522/209050070-ffcca9e6-06dd-41f4-a539-9b161eac7ef0.png"> ########## docs/sql-data-sources-protobuf.md: ########## @@ -0,0 +1,302 @@ +--- +layout: global +title: Protobuf Data Source Guide +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +* This will become a table of contents (this text will be scraped). +{:toc} + +Since Spark 3.4.0 release, [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) provides built-in support for reading and writing protobuf data. + +## Deploying +The `spark-protobuf` module is external and not included in `spark-submit` or `spark-shell` by default. + +As with any Spark applications, `spark-submit` is used to launch your application. `spark-protobuf_{{site.SCALA_BINARY_VERSION}}` +and its dependencies can be directly added to `spark-submit` using `--packages`, such as, + + ./bin/spark-submit --packages org.apache.spark:spark-protobuf_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... + +For experimenting on `spark-shell`, you can also use `--packages` to add `org.apache.spark:spark-protobuf_{{site.SCALA_BINARY_VERSION}}` and its dependencies directly, + + ./bin/spark-shell --packages org.apache.spark:spark-protobuf_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... + +See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## to_protobuf() and from_protobuf() +The spark-protobuf package provides function `to_protobuf` to encode a column as binary in protobuf +format, and `from_protobuf()` to decode protobuf binary data into a column. Both functions transform one column to +another column, and the input/output SQL data type can be a complex type or a primitive type. + +Using protobuf message as columns is useful when reading from or writing to a streaming source like Kafka. Each +Kafka key-value record will be augmented with some metadata, such as the ingestion timestamp into Kafka, the offset in Kafka, etc. +* If the "value" field that contains your data is in protobuf, you could use `from_protobuf()` to extract your data, enrich it, clean it, and then push it downstream to Kafka again or write it out to a different sink. +* `to_protobuf()` can be used to turn structs into protobuf message. This method is particularly useful when you would like to re-encode multiple columns into a single one when writing data out to Kafka. + +Spark SQL schema is generated based on the protobuf descriptor file or protobuf class passed to `from_protobuf` and `to_protobuf`. The specified protobuf class or protobuf descriptor file must match the data, otherwise, the behavior is undefined: it may fail or return arbitrary results. + +<div class="codetabs"> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +import org.apache.spark.sql.protobuf.functions._ + +// `from_protobuf` and `to_protobuf` provides two schema choices. First, via the protobuf descriptor file, and then via the protobuf message class name. + +val df = spark +.readStream +.format("kafka") +.option("kafka.bootstrap.servers", "host1:port1,host2:port2") +.option("subscribe", "topic1") +.load() + +// 1. Decode the Protobuf data into a struct; +// 2. Filter by column `favorite_color`; +// 3. Encode the column `user` in Protobuf format. +// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file. +val output = df +.select(from_protobuf($"value", protobufMessageName, descriptorFilePath) as $"user") +.where("user.favorite_color == \"red\"") +.select(to_protobuf($"user", protobufMessageName, descriptorFilePath) as $"value") + +val query = output +.writeStream +.format("kafka") +.option("kafka.bootstrap.servers", "host1:port1,host2:port2") +.option("topic", "topic2") +.start() + +// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf class name. +// The specified Protobuf class must match the data, otherwise the behavior is undefined: it may fail or return arbitrary result. The jar containing Java class should be shaded. Specifically, `com.google.protobuf.*` should be shaded to `org.sparkproject.spark-protobuf.protobuf.*`. +val output = df +.select(from_protobuf($"value", "org.spark.protobuf.User") as $"user") +.where("user.favorite_color == \"red\"") +.select(to_protobuf($"user", "org.spark.protobuf.User") as $"value") + +val query = output +.writeStream +.format("kafka") +.option("kafka.bootstrap.servers", "host1:port1,host2:port2") +.option("topic", "topic2") +.start() + +{% endhighlight %} +</div> +<div data-lang="java" markdown="1"> +{% highlight java %} +import static org.apache.spark.sql.functions.col; +import static org.apache.spark.sql.protobuf.functions.*; + +// `from_protobuf` and `to_protobuf` provides two schema choices. First, via the protobuf descriptor file, and then via the protobuf message class name. + +Dataset<Row> df = spark +.readStream() +.format("kafka") +.option("kafka.bootstrap.servers", "host1:port1,host2:port2") +.option("subscribe", "topic1") +.load(); + +// 1. Decode the Protobuf data into a struct; +// 2. Filter by column `favorite_color`; +// 3. Encode the column `user` in Protobuf format. +// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file. +Dataset<Row> output = df +.select(from_protobuf(col("value"), protobufMessageName, descriptorFilePath).as("user")) +.where("user.favorite_color == \"red\"") +.select(to_protobuf(col("user"), protobufMessageName, descriptorFilePath).as("value")); + +// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf class name. +// The specified Protobuf class must match the data, otherwise the behavior is undefined: it may fail or return arbitrary result. The jar containing Java class should be shaded. Specifically, `com.google.protobuf.*` should be shaded to `org.sparkproject.spark-protobuf.protobuf.*`. +Dataset<Row> output = df +.select(from_protobuf(col("value"), "org.spark.protobuf.User").as("user")) +.where("user.favorite_color == \"red\"") +.select(to_protobuf(col("user"), "org.spark.protobuf.User").as("value")); + +StreamingQuery query = output +.writeStream() +.format("kafka") +.option("kafka.bootstrap.servers", "host1:port1,host2:port2") +.option("topic", "topic2") +.start(); + +{% endhighlight %} +</div> +<div data-lang="python" markdown="1"> +{% highlight python %} +from pyspark.sql.protobuf.functions import from_protobuf, to_protobuf + +# `from_protobuf` and `to_protobuf` provides two schema choices. First, via the protobuf descriptor file, and then via the protobuf message class name. + +df = spark\ +.readStream\ +.format("kafka")\ +.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\ +.option("subscribe", "topic1")\ +.load() + +# 1. Decode the Protobuf data into a struct; +# 2. Filter by column `favorite_color`; +# 3. Encode the column `user` in Protobuf format. +# The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file. +output = df\ +.select(from_protobuf("value", protobufMessageName, descriptorFilePath).alias("user"))\ +.where('user.favorite_color == "red"')\ +.select(to_protobuf("user", protobufMessageName, descriptorFilePath).alias("value")) + +# Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf class name. +# The specified Protobuf class must match the data, otherwise the behavior is undefined: it may fail or return arbitrary result. The jar containing Java class should be shaded. Specifically, `com.google.protobuf.*` should be shaded to `org.sparkproject.spark-protobuf.protobuf.*`. +output = df\ +.select(from_protobuf("value", "org.spark.protobuf.User").alias("user"))\ +.where('user.favorite_color == "red"')\ +.select(to_protobuf("user", "org.spark.protobuf.User").alias("value")) + +query = output\ +.writeStream\ +.format("kafka")\ +.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\ +.option("topic", "topic2")\ +.start() + +{% endhighlight %} +</div> +</div> + +## Supported types for Protobuf -> Spark SQL conversion +Currently Spark supports reading [protobuf scalar types](https://developers.google.com/protocol-buffers/docs/proto3#scalar), [enum types](https://developers.google.com/protocol-buffers/docs/proto3#enum), [nested type](https://developers.google.com/protocol-buffers/docs/proto3#nested), and [maps type](https://developers.google.com/protocol-buffers/docs/proto3#maps) under messages of Protobuf. +One common issue that can arise when working with Protobuf data is the presence of circular references. In Protobuf, a circular reference occurs when a field refers back to itself or to another field that refers back to the original field. This can cause issues when parsing the data, as it can result in infinite loops or other unexpected behavior. Review Comment: I have a suggestion for rephrasing this section, but you can take attempt at it. An example would clarify things a lot for users. ########## docs/sql-data-sources-protobuf.md: ########## @@ -0,0 +1,302 @@ +--- +layout: global +title: Protobuf Data Source Guide +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +* This will become a table of contents (this text will be scraped). +{:toc} + +Since Spark 3.4.0 release, [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) provides built-in support for reading and writing protobuf data. + +## Deploying +The `spark-protobuf` module is external and not included in `spark-submit` or `spark-shell` by default. + +As with any Spark applications, `spark-submit` is used to launch your application. `spark-protobuf_{{site.SCALA_BINARY_VERSION}}` +and its dependencies can be directly added to `spark-submit` using `--packages`, such as, + + ./bin/spark-submit --packages org.apache.spark:spark-protobuf_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... + +For experimenting on `spark-shell`, you can also use `--packages` to add `org.apache.spark:spark-protobuf_{{site.SCALA_BINARY_VERSION}}` and its dependencies directly, + + ./bin/spark-shell --packages org.apache.spark:spark-protobuf_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... + +See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## to_protobuf() and from_protobuf() +The spark-protobuf package provides function `to_protobuf` to encode a column as binary in protobuf +format, and `from_protobuf()` to decode protobuf binary data into a column. Both functions transform one column to +another column, and the input/output SQL data type can be a complex type or a primitive type. + +Using protobuf message as columns is useful when reading from or writing to a streaming source like Kafka. Each +Kafka key-value record will be augmented with some metadata, such as the ingestion timestamp into Kafka, the offset in Kafka, etc. +* If the "value" field that contains your data is in protobuf, you could use `from_protobuf()` to extract your data, enrich it, clean it, and then push it downstream to Kafka again or write it out to a different sink. +* `to_protobuf()` can be used to turn structs into protobuf message. This method is particularly useful when you would like to re-encode multiple columns into a single one when writing data out to Kafka. + +Spark SQL schema is generated based on the protobuf descriptor file or protobuf class passed to `from_protobuf` and `to_protobuf`. The specified protobuf class or protobuf descriptor file must match the data, otherwise, the behavior is undefined: it may fail or return arbitrary results. + +<div class="codetabs"> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +import org.apache.spark.sql.protobuf.functions._ + +// `from_protobuf` and `to_protobuf` provides two schema choices. First, via the protobuf descriptor file, and then via the protobuf message class name. + +val df = spark +.readStream +.format("kafka") +.option("kafka.bootstrap.servers", "host1:port1,host2:port2") +.option("subscribe", "topic1") +.load() + +// 1. Decode the Protobuf data into a struct; +// 2. Filter by column `favorite_color`; +// 3. Encode the column `user` in Protobuf format. +// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file. +val output = df +.select(from_protobuf($"value", protobufMessageName, descriptorFilePath) as $"user") +.where("user.favorite_color == \"red\"") +.select(to_protobuf($"user", protobufMessageName, descriptorFilePath) as $"value") + +val query = output +.writeStream +.format("kafka") +.option("kafka.bootstrap.servers", "host1:port1,host2:port2") +.option("topic", "topic2") +.start() + +// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf class name. +// The specified Protobuf class must match the data, otherwise the behavior is undefined: it may fail or return arbitrary result. The jar containing Java class should be shaded. Specifically, `com.google.protobuf.*` should be shaded to `org.sparkproject.spark-protobuf.protobuf.*`. +val output = df +.select(from_protobuf($"value", "org.spark.protobuf.User") as $"user") +.where("user.favorite_color == \"red\"") +.select(to_protobuf($"user", "org.spark.protobuf.User") as $"value") + +val query = output +.writeStream +.format("kafka") +.option("kafka.bootstrap.servers", "host1:port1,host2:port2") +.option("topic", "topic2") +.start() + +{% endhighlight %} +</div> +<div data-lang="java" markdown="1"> +{% highlight java %} +import static org.apache.spark.sql.functions.col; +import static org.apache.spark.sql.protobuf.functions.*; + +// `from_protobuf` and `to_protobuf` provides two schema choices. First, via the protobuf descriptor file, and then via the protobuf message class name. + +Dataset<Row> df = spark +.readStream() +.format("kafka") +.option("kafka.bootstrap.servers", "host1:port1,host2:port2") +.option("subscribe", "topic1") +.load(); + +// 1. Decode the Protobuf data into a struct; +// 2. Filter by column `favorite_color`; +// 3. Encode the column `user` in Protobuf format. +// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file. +Dataset<Row> output = df +.select(from_protobuf(col("value"), protobufMessageName, descriptorFilePath).as("user")) +.where("user.favorite_color == \"red\"") +.select(to_protobuf(col("user"), protobufMessageName, descriptorFilePath).as("value")); + +// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf class name. +// The specified Protobuf class must match the data, otherwise the behavior is undefined: it may fail or return arbitrary result. The jar containing Java class should be shaded. Specifically, `com.google.protobuf.*` should be shaded to `org.sparkproject.spark-protobuf.protobuf.*`. +Dataset<Row> output = df +.select(from_protobuf(col("value"), "org.spark.protobuf.User").as("user")) +.where("user.favorite_color == \"red\"") +.select(to_protobuf(col("user"), "org.spark.protobuf.User").as("value")); + +StreamingQuery query = output +.writeStream() +.format("kafka") +.option("kafka.bootstrap.servers", "host1:port1,host2:port2") +.option("topic", "topic2") +.start(); + +{% endhighlight %} +</div> +<div data-lang="python" markdown="1"> +{% highlight python %} +from pyspark.sql.protobuf.functions import from_protobuf, to_protobuf + +# `from_protobuf` and `to_protobuf` provides two schema choices. First, via the protobuf descriptor file, and then via the protobuf message class name. + +df = spark\ +.readStream\ +.format("kafka")\ +.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\ +.option("subscribe", "topic1")\ +.load() + +# 1. Decode the Protobuf data into a struct; +# 2. Filter by column `favorite_color`; +# 3. Encode the column `user` in Protobuf format. +# The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file. +output = df\ +.select(from_protobuf("value", protobufMessageName, descriptorFilePath).alias("user"))\ +.where('user.favorite_color == "red"')\ +.select(to_protobuf("user", protobufMessageName, descriptorFilePath).alias("value")) + +# Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf class name. +# The specified Protobuf class must match the data, otherwise the behavior is undefined: it may fail or return arbitrary result. The jar containing Java class should be shaded. Specifically, `com.google.protobuf.*` should be shaded to `org.sparkproject.spark-protobuf.protobuf.*`. +output = df\ +.select(from_protobuf("value", "org.spark.protobuf.User").alias("user"))\ +.where('user.favorite_color == "red"')\ +.select(to_protobuf("user", "org.spark.protobuf.User").alias("value")) + +query = output\ +.writeStream\ +.format("kafka")\ +.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\ +.option("topic", "topic2")\ +.start() + +{% endhighlight %} +</div> +</div> + +## Supported types for Protobuf -> Spark SQL conversion +Currently Spark supports reading [protobuf scalar types](https://developers.google.com/protocol-buffers/docs/proto3#scalar), [enum types](https://developers.google.com/protocol-buffers/docs/proto3#enum), [nested type](https://developers.google.com/protocol-buffers/docs/proto3#nested), and [maps type](https://developers.google.com/protocol-buffers/docs/proto3#maps) under messages of Protobuf. +One common issue that can arise when working with Protobuf data is the presence of circular references. In Protobuf, a circular reference occurs when a field refers back to itself or to another field that refers back to the original field. This can cause issues when parsing the data, as it can result in infinite loops or other unexpected behavior. Review Comment: I think we can move recursive protobuf to a sub section. Right now this directly jumps into this. This is not a new feature. You implemented in the first version itself :). Also good to include an example. We can even move this to further down. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
