Interesting, yeah I think you'll have to implement code to recurse through
the (Row) DataType and somehow auto generate the JSONSchema you want.

We abstracted the conversions from JSONSchema to other type systems in this
JsonSchemaConverter
<https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities/src/main/java/org/wikimedia/eventutilities/core/event/types/JsonSchemaConverter.java>.
There's nothing special going on here, I've seen versions of this schema
conversion code over and over again in different frameworks. This one just
allows us to plug in a SchemaConversions
<https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities/src/main/java/org/wikimedia/eventutilities/core/event/types/SchemaConversions.java>
implementation
to provide the mappings to the output type system (like the Flink DataType
mappings
<https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java>
I
linked to before), rather than hardcoding the output types.

If I were trying to do what you are doing (in our codebase)...I'd create a
Flink DataTypeConverter<T> that iterated through a (Row) DataType and a
SchemaConversions<JsonNode> implementation that mapped to the JsonNode that
represented the JSONSchema.  (If not using Jackson...then you could use
another Java JSON object than JsonNode).
You could also make a SchemaConversions<ProtobufSchema> (with whatever
Protobuf class to use...I'm not familiar with Protobuf) and then use the
same DataTypeConverter to convert to ProtobufSchema.   AND THEN...I'd
wonder if the input schema recursion code itself could be abstracted too so
that it would work for either JsonSchema OR DataType OR whatever but anyway
that is probably too crazy and too much for what you are doing...but it
would be cool! :p





On Wed, Nov 9, 2022 at 9:52 AM Theodor Wübker <theo.wueb...@inside-m2m.de>
wrote:

> I want to register the result-schema in a schema registry, as I am pushing
> the result-data to a Kafka topic. The result-schema is not known at
> compile-time, so I need to find a way to compute it at runtime from the
> resulting Flink Schema.
>
> -Theo
>
> (resent - again sorry, I forgot to add the others in the cc)
>
> On 9. Nov 2022, at 14:59, Andrew Otto <o...@wikimedia.org> wrote:
>
> >  I want to convert the schema of a Flink table to both Protobuf *schema* and
> JSON *schema*
> Oh, you want to convert from Flink Schema TO JSONSchema?  Interesting.
> That would indeed be something that is not usually done.  Just curious, why
> do you want to do this?
>
> On Wed, Nov 9, 2022 at 8:46 AM Andrew Otto <o...@wikimedia.org> wrote:
>
>> Hello!
>>
>> I see you are talking about JSONSchema, not just JSON itself.
>>
>> We're trying to do a similar thing at Wikimedia and have developed some
>> tooling around this.
>>
>> JsonSchemaFlinkConverter
>> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/JsonSchemaFlinkConverter.java>
>> has some logic to convert from JSONSchema Jackson ObjectNodes to Flink
>> Table DataType or Table SchemaBuilder, or Flink DataStream
>> TypeInformation[Row].  Some of the conversions from JSONSchema to Flink
>> type are opinionated.  You can see the mappings here
>> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java>
>> .
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Nov 9, 2022 at 2:33 AM Theodor Wübker <theo.wueb...@inside-m2m.de>
>> wrote:
>>
>>> Thanks for your reply Yaroslav! The way I do it with Avro seems similar
>>> to what you pointed out:
>>>
>>> ResolvedSchema resultSchema = resultTable.getResolvedSchema();
>>> DataType type = resultSchema.toSinkRowDataType();
>>> org.apache.avro.Schema converted = 
>>> AvroSchemaConverter.convertToSchema(type.getLogicalType());
>>>
>>> I mentioned the ResolvedSchema because it is my starting point after the
>>> SQL operation. It seemed to me that I can not retrieve something that
>>> contains more schema information from the table so I got myself this. About
>>> your other answers: It seems the classes you mentioned can be used to
>>> serialize actual Data? However this is not quite what I want to do.
>>> Essentially I want to convert the schema of a Flink table to both
>>> Protobuf *schema* and JSON *schema* (for Avro as you can see I have it
>>> already). It seems odd that this is not easily possible, because converting
>>> from a JSON schema to a Schema of Flink is possible using the
>>> JsonRowSchemaConverter. However the other way is not implemented it seems.
>>> This is how I got a Table Schema (that I can use in a table descriptor)
>>> from a JSON schema:
>>>
>>> TypeInformation<Row> type = JsonRowSchemaConverter.convert(json);
>>> DataType row = TableSchema.fromTypeInfo(type).toPhysicalRowDataType();
>>> Schema schema = Schema.newBuilder().fromRowDataType(row).build();
>>>
>>> Sidenote: I use deprecated methods here, so if there is a better
>>> approach please let me know! But it shows that in Flink its easily possible
>>> to create a Schema for a TableDescriptor from a JSON Schema - the other way
>>> is just not so trivial it seems. And for Protobuf so far I don’t have any
>>> solutions, not even creating a Flink Schema from a Protobuf Schema - not to
>>> mention the other way around.
>>>
>>> -Theo
>>>
>>> (resent because I accidentally only responded to you, not the Mailing
>>> list - sorry)
>>>
>>>
>

Reply via email to