Hi Averell,

If you can describe the JSON schema I'd suggest looking into the SQL
API. (And I think you do need to define your schema upfront. If I am not
mistaken Parquet must know the common schema.)

Then you could do sth like:
CREATE TABLE json (
    // define the schema of your json data
) WITH (
  ...
 'format' = 'json',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
)

CREATE TABLE parquet (

    // define the schema of your parquet data

) WITH (
 'connector' = 'filesystem',
 'path' = '/tmp/parquet',
 'format' = 'parquet'
);

You might also want to have a look at the LIKE[3] to define the schema
of your parquet table if it is mostly similar to the json schema.

INSERT INTO parquet SELECT /*transform your data*/ FROM json;

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#how-to-create-a-table-with-json-format

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/parquet.html#how-to-create-a-table-with-parquet-format

[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table

On 21/08/2020 02:40, Averell wrote:
> Hello,
>
> I have a stream with each message is a JSON string with a quite complex
> schema (multiple fields, multiple nested layers), and I need to write that
> into parquet files after some slight modifications/enrichment.
>
> I wonder what options are available for me to do that. I'm thinking of JSON
> -> AVRO (GenericRecord) -> Parquet. Is that an option? I would want to be
> able to quickly/dynamically (as less code change as possible) change the
> JSON schema.
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to