Zouxxyy commented on code in PR #46831:
URL: https://github.com/apache/spark/pull/46831#discussion_r1774331081
##########
common/variant/shredding.md:
##########
@@ -0,0 +1,244 @@
+# Shredding Overview
+
+The Spark Variant type is designed to store and process semi-structured data
efficiently, even with heterogeneous values. Query engines encode each variant
value in a self-describing format, and store it as a group containing **value**
and **metadata** binary fields in Parquet. Since data is often partially
homogenous, it can be beneficial to extract certain fields into separate
Parquet columns to further improve performance. We refer to this process as
"shredding". Each Parquet file remains fully self-describing, with no
additional metadata required to read or fully reconstruct the Variant data from
the file. Combining shredding with a binary residual provides the flexibility
to represent complex, evolving data with an unbounded number of unique fields
while limiting the size of file schemas, and retaining the performance benefits
of a columnar format.
+
+This document focuses on the shredding semantics, Parquet representation,
implications for readers and writers, as well as the Variant reconstruction.
For now, it does not discuss which fields to shred, user-facing API changes, or
any engine-specific considerations like how to use shredded columns. The
approach builds on top of the generic Spark Variant representation, and
leverages the existing Parquet specification for maximum compatibility with the
open-source ecosystem.
+
+At a high level, we replace the **value** and **metadata** of the Variant
Parquet group with one or more fields called **object**, **array**,
**typed_value** and **untyped_value**. These represent a fixed schema suitable
for constructing the full Variant value for each row.
+
+Shredding lets Spark (or any other query engine) reap the full benefits of
Parquet's columnar representation, such as more compact data encoding, min/max
statistics for data skipping, and I/O and CPU savings from pruning unnecessary
fields not accessed by a query (including the non-shredded Variant binary data).
+Without shredding, any query that accesses a Variant column must fetch all
bytes of the full binary buffer. With shredding, we can get nearly equivalent
performance as in a relational (scalar) data model.
+
+For example, `select variant_get(variant_col, ‘$.field1.inner_field2’,
‘string’) from tbl` only needs to access `inner_field2`, and the file scan
could avoid fetching the rest of the Variant value if this field was shredded
into a separate column in the Parquet schema. Similarly, for the query `select
* from tbl where variant_get(variant_col, ‘$.id’, ‘integer’) = 123`, the scan
could first decode the shredded `id` column, and only fetch/decode the full
Variant value for rows that pass the filter.
+
+# Parquet Example
+
+Consider the following Parquet schema together with how Variant values might
be mapped to it. Notice that we represent each shredded field in **object** as
a group of two fields, **typed_value** and **untyped_value**. We extract all
homogenous data items of a certain path into **typed_value**, and set aside
incompatible data items in **untyped_value**. Intuitively, incompatibilities
within the same path may occur because we store the shredding schema per
Parquet file, and each file can contain several row groups. Selecting a type
for each field that is acceptable for all rows would be impractical because it
would require buffering the contents of an entire file before writing.
+
+Typically, the expectation is that **untyped_value** exists at every level as
an option, along with one of **object**, **array** or **typed_value**. If the
actual Variant value contains a type that does not match the provided schema,
it is stored in **untyped_value**. An **untyped_value** may also be populated
if an object can be partially represented: any fields that are present in the
schema must be written to those fields, and any missing fields are written to
**untyped_valud**.
+
+```
+optional group variant_col {
+ optional binary untyped_value;
+ optional group object {
+ optional group a {
+ optional binary untyped_value;
+ optional int64 typed_value;
+ }
+ optional group b {
+ optional binary untyped_value;
+ optional group object {
+ optional group c {
+ optional binary untyped_value;
+ optional binary typed_value (STRING);
+ }
+ }
+ }
+ }
+}
+```
+
+| Variant Value | Top-level untyped_value | b.untyped_value | Non-null in a |
Non-null in b.c |
+|---------------|--------------------------|---------------|---------------|
+| {a: 123, b: {c: “hello”}} | null | null | typed_value | typed_value |
+| {a: 1.23, b: {c: “123”}} | null | null | untyped_value | typed_value |
+| {a: [1,2,3], b: {c: null}} | null | null | untyped_value | untyped_value |
+| {a: 123, c: 456} | {c: 456} | null | typed_value | null |
+| {a: 123, b: {c: "hello", d: 456}} | null | {d: 456} | typed_value |
typed_value |
+| [{a: 1, b: {c: 2}}, {a: 3, b: {c: 4}}] | [{a: 1, b: {c: 2}}, {a: 3, b: {c:
4}}] | null | null | null |
+
+# Parquet Layout
+
+The **array** and **object** fields represent Variant array and object types,
respectively. Arrays must use the three-level list structure described in
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md.
+
+An **object** field must be a group. Each field name of this inner group
corresponds to the Variant value's object field name. Each inner field's type
is a recursively shredded variant value: that is, the fields of each object
field must be one or more of **object**, **array**, **typed_value** or
**untyped_value**.
+
+Similarly the elements of an **array** must be a group containing one or more
of **object**, **array**, **typed_value** or **untyped_value**.
+
+Each leaf in the schema can store an arbitrary Variant value. It contains an
**untyped_value** binary field and a **typed_value** field. If non-null,
**untyped_value** represents the value stored as a Variant binary; the metadata
and value of a normal Variant are concatenated. The **typed_value** field may
be any type that has a corresponding Variant type. For each value in the data,
at most one of the **typed_value** and **untyped_value** may be non-null. A
writer may omit either field, which is equivalent to all rows being null.
+
+| typed_value | untyped_value | Meaning |
+|-------------|----------------|---------|
+| null | null | Field is missing in the reconstructed Variant. |
+| null | non-null | Field may be any type in the reconstructed Variant. |
+| non-null | null | Field has this column’s type in the reconstructed Variant.
|
+| non-null | non-null | Invalid |
+
+The **typed_value** may be absent from the Parquet schema for any field, which
is equivalent to its value being always null (in which case the shredded field
is always stored as a Variant binary). By the same token, **untyped_value** may
be absent, which is equivalent to their value being always null (in which case
the field will always be missing or have the type of the **typed_value**
column).
+
+The full metadata and value can be reconstructed from **untyped_value** by
treating the leading bytes as metadata, and using the header, dictionary size
and final dictionary offset to determine the start of the Variant value
section. (See the metadata description in the common/variant/README.md for more
detail on how to interpret it.) For example, in the binary below, there is a
one-element dictionary, and the final offset (`offset[1]`) indicates that the
last dictionary entry ends at the second byte. Therefore the full metadata size
is six bytes, and the rest is the value section of the Variant.
+
+```
+ hdr sz offset[0] offset[1] bytes[0] bytes[1] value
+ --------------------------------------------------------------------
+| | | | | | |
+| 0x01 | 0x01 | 0x00 | 0x02 | ‘h’ | ‘i’ | . . . . . . . .
+|______|______|_________|_________|________|________|________________
+```
+
+# Unshredded values
+
+If all values can be represented at a given level by whichever of **object**,
**array** or **typed_value** is present, **untyped_value** is set to null.
+
+If a value cannot be represented by whichever of **object**, **array** or
**typed_value** is present in the schema, then it is stored in
**untyped_value**, and the other fields are set to null. In the Parquet example
above, if field **a** was an object or array, or a non-integer scalar, it would
be stored in **untyped_value**.
+
+If a value is an object, and the **object** field is present but does not
contain all of the fields in the value, then any remaining fields are stored in
an object in **untyped_value**. In the Parquet example above, if field **b**
was an object of the form **{"c": 1, "d": 2}"**, then the object **{"d": 2}**
would be stored in **untyped_value**, and the **c** field would be shredded
recursively under **object.c**.
+
+Note that an array is always fully shredded if there is an **array** field, so
the above consideration for **object** is not relevant for arrays: only one of
**array** or **untyped_value** may be non-null at a given level.
+
+# Using untyped_value vs. typed_value
+
+In general, it is desirable to store values in the **typed_value** field
rather than the **untyped_value** whenever possible. This will typically
improve encoding efficiency, and allow the use of Parquet statistics to filter
at the row group or page level. In the best case, the **untyped_value** fields
are all null and the engine does not need to read them (or it can omit them
from the schema on write entirely). There are two main motivations for
including the **untyped_value** column:
+
+1) In a case where there are rare type mismatches (for example, a numeric
field with rare strings like “n/a”), we allow the field to be shredded, which
could still be a significant performance benefit compared to fetching and
decoding the full value/metadata binary.
+2) Since there is a single schema per file, there would be no easy way to
recover from a type mismatch encountered late in a file write. Parquet files
can be large, and buffering all file data before starting to write could be
expensive. Including an untyped column for every field guarantees we can adhere
to the requested shredding schema.
+
+The **untyped_value** is stored in a single binary column, rather than storing
the value and metadata separately as is done in the unshredded binary format.
The motivation for storing them separately for unshredded data is that this
lets the engine encode and compress the metadata more efficiently when the
fields are consistent across rows. We chose to combine them in the shredded
fields: we expect the encoding/compression benefit to be lower, since in the
case of uniform data, the values should be stored in typed columns. Using a
single binary reduces the extra metadata required in the Parquet schema, which
can be significant in some cases.
Review Comment:
> The motivation for storing them separately for unshredded data is that
this lets the engine encode and compress the metadata more efficiently when the
fields are consistent across rows.
Hi @cashmand Is there any performance comparison for this part? We are
considering introducing variants into Apache Paimon as a new type. If variant
is a single binary column, it will be easier to integrate. If it is a group
containing value and metadata, the integration cost will be higher, such as:
dedicated Unsafe Row reader writer, dedicated ColumnVector for reader/writer,
etc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]