SandishKumarHN commented on code in PR #39550:
URL: https://github.com/apache/spark/pull/39550#discussion_r1170698903


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/functions.scala:
##########
@@ -72,51 +69,129 @@ object functions {
   }
 
   /**
-   * Converts a binary column of Protobuf format into its corresponding 
catalyst value. The
-   * specified Protobuf class must match the data, otherwise the behavior is
-   * undefined: it may fail or return arbitrary result. The jar containing 
Java class should be
+   * Converts a binary column of Protobuf format into its corresponding 
catalyst value.
+   * `messageClassName` points to Protobuf Java class. The jar containing Java 
class should be
    * shaded. Specifically, `com.google.protobuf.*` should be shaded to
    * `org.sparkproject.spark-protobuf.protobuf.*`.
+   * https://github.com/rangadi/shaded-protobuf-classes is useful to create 
shaded jar from
+   * Protobuf files.
    *
    * @param data
    *   the binary column.
-   * @param shadedMessageClassName
-   *   The Protobuf class name. E.g. 
<code>org.spark.examples.protobuf.ExampleEvent</code>.
+   * @param messageClassName
+   *   The full name for Protobuf Java class. E.g. 
<code>com.example.protos.ExampleEvent</code>.
    *   The jar with these classes needs to be shaded as described above.
    * @since 3.4.0
    */
   @Experimental
-  def from_protobuf(data: Column, shadedMessageClassName: String): Column = {
-    new Column(ProtobufDataToCatalyst(data.expr, shadedMessageClassName))
+  def from_protobuf(data: Column, messageClassName: String): Column = {
+    new Column(ProtobufDataToCatalyst(data.expr, messageClassName))
   }
 
   /**
-   * Converts a column into binary of protobuf format.
+   * Converts a binary column of Protobuf format into its corresponding 
catalyst value.
+   * `messageClassName` points to Protobuf Java class. The jar containing Java 
class should be
+   * shaded. Specifically, `com.google.protobuf.*` should be shaded to
+   * `org.sparkproject.spark-protobuf.protobuf.*`.
+   * https://github.com/rangadi/shaded-protobuf-classes is useful to create 
shaded jar from
+   * Protobuf files.
+   *
+   * @param data
+   *   the binary column.
+   * @param messageClassName
+   *   The full name for Protobuf Java class. E.g. 
<code>com.example.protos.ExampleEvent</code>.
+   *   The jar with these classes needs to be shaded as described above.
+   * @param options
+   * @since 3.4.0
+   */
+  @Experimental

Review Comment:
   @rangadi @ericsun95 I was able to get it to work with the binaryFile format 
by making a few minor changes to the from_protobuf module.
   
   ```
   val df spark.read.format("binaryFile").load("git/OSM-binary/resources/")
   
   val protoDf = df.withColumn( "Headerblock", 
   org.apache.spark.sql.protobuf.functions.from_protobuf(col("content"), 
"HeaderBlock", descFile))
   .withColumn("DenseInfo",
   org.apache.spark.sql.protobuf.functions.from_protobuf( col("content"), 
"DenseInfo", 
descFile)).drop("content").drop("length").drop("modificationTime").drop("path")
   
   protoDf.show(truncate = false)
   ```
   
   and The ProtobufDataToCatalyst class also needs to be tweaked a bit at the 
[line](https://github.com/apache/spark/blob/master/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala#L103)
   
   ```
   if (buffer.get() == MAGIC_BYTE) {
     val datinput: DataInputStream = new DataInputStream(new 
ByteArrayInputStream(buffer.array()))
     val headersize = datinput.readInt()
     if (headersize > MAX_HEADER_SIZE) {
       throw new FileFormatException("Unexpectedly long header " + 
MAX_HEADER_SIZE + " bytes. Possibly corrupt file.");
     }
     binary = new Array[Byte](headersize)
     datinput.readFully(binary)
   
     val blogHeader = BlobHeader.parseFrom(binary)
   
     val out : FileBlock  = FileBlock.newInstance(blogHeader.getType, null, 
blogHeader.getIndexdata)
     val binaryData: Array[Byte] = new Array[Byte](blogHeader.getDatasize)
     datinput.readFully(binaryData)
   
     val blob = Blob.parseFrom(binaryData)
   
     if (blob.hasRaw()) {
         binary = blob.getRaw().toByteArray;
     } else if (blob.hasZlibData()) {
       val buf2: Array[Byte]  = new  Array[Byte](blob.getRawSize())
       val decompresser: Inflater  = new Inflater();
       decompresser.setInput(blob.getZlibData().toByteArray());
       try {
         decompresser.inflate(buf2);
       } catch {
         case e: DataFormatException =>
           throw new UncheckedIOException(new FileFormatException(e));
       }
       assert(decompresser.finished());
       decompresser.end();
       binary = ByteString.copyFrom(buf2).toByteArray
     }
   ```
   
   @rangadi would it be useful to have a spark source for .pbf files? I would 
be happy to work on it.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to