Hey Flinksters,
I'm reading a nested JSON object into a table and would like to access the
nested rows inside an array. Is there a way to flatten them so that I get a
table with the nested rows?
So far, I've only been able to figure out how to access a specific element
inside the array using the "at" method but I'm trying to flatten the nested
rows into a table and the arrays can have variable length. Below is a code
snippet of what I have thus far but notice how I'm only accessing the first
element in each array.
How do you do this in Flink? Apologies if this is obvious - I wasn't able
to find an example or documentation and would appreciate any help.
Thank you,
Matthias
---------------
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
TableDescriptor jsonTable = TableDescriptor.forConnector("filesystem")
.schema(Schema.newBuilder()
.column("id", DataTypes.INT())
.column("customerid", DataTypes.INT())
.column("time", DataTypes.INT())
.column("entries", DataTypes.ARRAY(DataTypes.ROW(
DataTypes.FIELD("productid", DataTypes.INT()),
DataTypes.FIELD("quantity", DataTypes.INT()),
DataTypes.FIELD("unit_price", DataTypes.DECIMAL(9,3)),
DataTypes.FIELD("discount", DataTypes.DECIMAL(9,3))
)))
.build())
.option("path", C360Test.RETAIL_DATA_DIR.toAbsolutePath() +
"/orders.json")
.format("json")
.build();
tEnv.createTable("Orders",jsonTable);
Table orders = tEnv.from("Orders");
Table flattenEntries =
orders.select($("entries").at(1).get("quantity").sum().as("totalquant"));