Re: Handling JSON Serialization without Kryo

2023-03-27 Thread Andrew Otto
Hi,

> The problem here is that the shape of the data can vary wildly and
dynamically. Some records may have properties unique to only that record,
which makes defining a POJO difficult

AFAIK, the only way to avoid POJOs in Flink is to use Row (DataStream) or
RowData (Table API).  These are Flink's 'schema' typesystem, and they allow
you to dynamically describe the shape of data.

At the Wikimedia Foundation, we also find it difficult to use JSON +
Flink.  We at least have JSONSchemas though, and that has allowed us to
write custom converters (serdes) for JSONSchema -> Row & RowData.

This might not be helpful for you (unless you also use JSONSchema), but
here's a couple of convos on this list with some relevant info:

- Flink, JSON, and JSONSchemas

- Converting ResolvedSchema to JSON and Protobuf Schemas

Good luck!
-Andrew Otto
 Wikimedia Foundation

On Wed, Mar 22, 2023 at 8:07 AM Rion Williams  wrote:

> Hi Ken,
>
> I’m going to profile the job today to try and get a better handle on where
> the bottleneck is. The job currently just passes around JsonObjects between
> the operators, which are relying on Kryo. The job also writes to Postgres,
> Kafka, and Elasticsearch so it’s possible that one of those is causing the
> back-pressure.
>
> I’m a bit shocked at the stunningly low speeds as well. Initially, the job
> would perform fine but checkpointing sizes would gradually build up (as
> would durations for them) until performance degraded to the borderline
> unusable 1-2 records/second.
>
> On Mar 21, 2023, at 2:35 PM, Ken Krugler 
> wrote:
>
> Hi Rion,
>
> I’m using Gson to deserialize to a Map.
>
> 1-2 records/second sounds way too slow, unless each record is enormous.
>
> — Ken
>
> On Mar 21, 2023, at 6:18 AM, Rion Williams  wrote:
>
> Hi Ken,
>
> Thanks for the response. I hadn't tried exploring the use of the Record
> class, which I'm assuming you're referring to a flink.types.Record, to read
> the JSON into. Did you handle this via using a mapper to read the
> properties in (e.g. Gson, Jackson) as fields or take a different approach?
> Additionally, how has your experience been with performance? Kryo with the
> existing job leveraging JsonObjects (via Gson) is horrific (~1-2
> records/second) and can't keep up with the speed of the producers, which is
> the impetus behind reevaluating the serialization.
>
> I'll explore this a bit more.
>
> Thanks,
>
> Rion
>
> On Mon, Mar 20, 2023 at 10:28 PM Ken Krugler 
> wrote:
>
>> Hi Rion,
>>
>> For my similar use case, I was able to make a simplifying assumption that
>> my top-level JSON object was a record.
>>
>> I then registered a custom Kryo serde that knew how to handle the handful
>> of JsonPrimitive types for the record entries.
>>
>> I recently looked at extending that to support arrays and nested records,
>> but haven’t had to do that.
>>
>> — Ken
>>
>>
>> On Mar 20, 2023, at 6:56 PM, Rion Williams  wrote:
>>
>> Hi Shammon,
>>
>> Unfortunately it’s a data stream job. I’ve been exploring a few options
>> but haven’t found anything I’ve decided on yet. I’m currently looking at
>> seeing if I can leverage some type of partial serialization to bind to the
>> properties that I know the job will use and retain the rest as a JSON blob.
>> I’ve also consider trying to store the fields as a large map of
>> string-object pairs and translating thay into a string prior to writing to
>> the sinks.
>>
>> Still accepting any/all ideas that I come across to see if I can handle
>> this in an efficient, reasonable way.
>>
>> Thanks,
>>
>> Rion
>>
>> On Mar 20, 2023, at 8:40 PM, Shammon FY  wrote:
>>
>> 
>> Hi Rion
>>
>> Is your job datastream or table/sql? If it is a table/sql job, and you
>> can define all the fields in json you need, then you can directly use json
>> format [1] to parse the data.
>>
>> You can also customize udf functions to parse json data into struct data,
>> such as map, row and other types supported by flink
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/
>>
>> Best,
>> Shammon FY
>>
>>
>> On Sun, Mar 19, 2023 at 7:44 AM Rion Williams 
>> wrote:
>>
>>> Hi all,
>>>
>>> I’m reaching out today for some suggestions (and hopefully a solution)
>>> for a Flink job that I’m working on. The job itself reads JSON strings from
>>> a Kafka topic and reads those into JSONObjects (currently via Gson), which
>>> are then operated against, before ultimately being written out to Kafka
>>> again.
>>>
>>> The problem here is that the shape of the data can vary wildly and
>>> dynamically. Some records may have properties unique to only that record,
>>> which makes defining a POJO difficult. In addition to this, the JSONObjects
>>> fall by to Kryo serialization which is leading to atrocious throughput.
>>>
>>> I basically need to read in JSON strings, enrich properties on 

Re: Handling JSON Serialization without Kryo

2023-03-22 Thread Rion Williams
Hi Ken,I’m going to profile the job today to try and get a better handle on where the bottleneck is. The job currently just passes around JsonObjects between the operators, which are relying on Kryo. The job also writes to Postgres, Kafka, and Elasticsearch so it’s possible that one of those is causing the back-pressure.I’m a bit shocked at the stunningly low speeds as well. Initially, the job would perform fine but checkpointing sizes would gradually build up (as would durations for them) until performance degraded to the borderline unusable 1-2 records/second.On Mar 21, 2023, at 2:35 PM, Ken Krugler  wrote:Hi Rion,I’m using Gson to deserialize to a Map.1-2 records/second sounds way too slow, unless each record is enormous.— KenOn Mar 21, 2023, at 6:18 AM, Rion Williams  wrote:Hi Ken,Thanks for the response. I hadn't tried exploring the use of the Record class, which I'm assuming you're referring to a flink.types.Record, to read the JSON into. Did you handle this via using a mapper to read the properties in (e.g. Gson, Jackson) as fields or take a different approach? Additionally, how has your experience been with performance? Kryo with the existing job leveraging JsonObjects (via Gson) is horrific (~1-2 records/second) and can't keep up with the speed of the producers, which is the impetus behind reevaluating the serialization.I'll explore this a bit more.Thanks,RionOn Mon, Mar 20, 2023 at 10:28 PM Ken Krugler  wrote:Hi Rion,For my similar use case, I was able to make a simplifying assumption that my top-level JSON object was a record.I then registered a custom Kryo serde that knew how to handle the handful of JsonPrimitive types for the record entries.I recently looked at extending that to support arrays and nested records, but haven’t had to do that.— KenOn Mar 20, 2023, at 6:56 PM, Rion Williams  wrote:Hi Shammon,Unfortunately it’s a data stream job. I’ve been exploring a few options but haven’t found anything I’ve decided on yet. I’m currently looking at seeing if I can leverage some type of partial serialization to bind to the properties that I know the job will use and retain the rest as a JSON blob. I’ve also consider trying to store the fields as a large map of string-object pairs and translating thay into a string prior to writing to the sinks.Still accepting any/all ideas that I come across to see if I can handle this in an efficient, reasonable way.Thanks,RionOn Mar 20, 2023, at 8:40 PM, Shammon FY  wrote:Hi RionIs your job datastream or table/sql? If it is a table/sql job, and you can define all the fields in json you need, then you can directly use json format [1] to parse the data. You can also customize udf functions to parse json data into struct data, such as map, row and other types supported by flink[1] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/Best,Shammon FYOn Sun, Mar 19, 2023 at 7:44 AM Rion Williams  wrote:Hi all,

I’m reaching out today for some suggestions (and hopefully a solution) for a Flink job that I’m working on. The job itself reads JSON strings from a Kafka topic and reads those into JSONObjects (currently via Gson), which are then operated against, before ultimately being written out to Kafka again.

The problem here is that the shape of the data can vary wildly and dynamically. Some records may have properties unique to only that record, which makes defining a POJO difficult. In addition to this, the JSONObjects fall by to Kryo serialization which is leading to atrocious throughput.

I basically need to read in JSON strings, enrich properties on these objects, and ultimately write them to various sinks.  Is there some type of JSON-based class or library or an approach I could use to accomplish this in an efficient manner? Or if possibly a way to partially write a POJO that would allow me to interact with sections/properties of the JSON while retaining other properties that might be dynamically present or unique to the message?

Any advice or suggestions would be welcome! I’ll also be happy to provide any additional context if it would help!

Thanks,

Rion

(cross-posted to users+dev for reach)

--Ken Kruglerhttp://www.scaleunlimited.comCustom big data solutionsFlink, Pinot, Solr, Elasticsearch




--Ken Kruglerhttp://www.scaleunlimited.comCustom big data solutionsFlink, Pinot, Solr, Elasticsearch




Re: Handling JSON Serialization without Kryo

2023-03-21 Thread Ken Krugler
Hi Rion,

I’m using Gson to deserialize to a Map.

1-2 records/second sounds way too slow, unless each record is enormous.

— Ken

> On Mar 21, 2023, at 6:18 AM, Rion Williams  wrote:
> 
> Hi Ken,
> 
> Thanks for the response. I hadn't tried exploring the use of the Record 
> class, which I'm assuming you're referring to a flink.types.Record, to read 
> the JSON into. Did you handle this via using a mapper to read the properties 
> in (e.g. Gson, Jackson) as fields or take a different approach? Additionally, 
> how has your experience been with performance? Kryo with the existing job 
> leveraging JsonObjects (via Gson) is horrific (~1-2 records/second) and can't 
> keep up with the speed of the producers, which is the impetus behind 
> reevaluating the serialization.
> 
> I'll explore this a bit more.
> 
> Thanks,
> 
> Rion
> 
> On Mon, Mar 20, 2023 at 10:28 PM Ken Krugler  > wrote:
> Hi Rion,
> 
> For my similar use case, I was able to make a simplifying assumption that my 
> top-level JSON object was a record.
> 
> I then registered a custom Kryo serde that knew how to handle the handful of 
> JsonPrimitive types for the record entries.
> 
> I recently looked at extending that to support arrays and nested records, but 
> haven’t had to do that.
> 
> — Ken
> 
> 
>> On Mar 20, 2023, at 6:56 PM, Rion Williams > > wrote:
>> 
>> Hi Shammon,
>> 
>> Unfortunately it’s a data stream job. I’ve been exploring a few options but 
>> haven’t found anything I’ve decided on yet. I’m currently looking at seeing 
>> if I can leverage some type of partial serialization to bind to the 
>> properties that I know the job will use and retain the rest as a JSON blob. 
>> I’ve also consider trying to store the fields as a large map of 
>> string-object pairs and translating thay into a string prior to writing to 
>> the sinks.
>> 
>> Still accepting any/all ideas that I come across to see if I can handle this 
>> in an efficient, reasonable way.
>> 
>> Thanks,
>> 
>> Rion
>> 
>>> On Mar 20, 2023, at 8:40 PM, Shammon FY >> > wrote:
>>> 
>>> 
>>> Hi Rion
>>> 
>>> Is your job datastream or table/sql? If it is a table/sql job, and you can 
>>> define all the fields in json you need, then you can directly use json 
>>> format [1] to parse the data. 
>>> 
>>> You can also customize udf functions to parse json data into struct data, 
>>> such as map, row and other types supported by flink
>>> 
>>> 
>>> [1] 
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/
>>>  
>>> 
>>> 
>>> Best,
>>> Shammon FY
>>> 
>>> 
>>> On Sun, Mar 19, 2023 at 7:44 AM Rion Williams >> > wrote:
>>> Hi all,
>>> 
>>> I’m reaching out today for some suggestions (and hopefully a solution) for 
>>> a Flink job that I’m working on. The job itself reads JSON strings from a 
>>> Kafka topic and reads those into JSONObjects (currently via Gson), which 
>>> are then operated against, before ultimately being written out to Kafka 
>>> again.
>>> 
>>> The problem here is that the shape of the data can vary wildly and 
>>> dynamically. Some records may have properties unique to only that record, 
>>> which makes defining a POJO difficult. In addition to this, the JSONObjects 
>>> fall by to Kryo serialization which is leading to atrocious throughput.
>>> 
>>> I basically need to read in JSON strings, enrich properties on these 
>>> objects, and ultimately write them to various sinks.  Is there some type of 
>>> JSON-based class or library or an approach I could use to accomplish this 
>>> in an efficient manner? Or if possibly a way to partially write a POJO that 
>>> would allow me to interact with sections/properties of the JSON while 
>>> retaining other properties that might be dynamically present or unique to 
>>> the message?
>>> 
>>> Any advice or suggestions would be welcome! I’ll also be happy to provide 
>>> any additional context if it would help!
>>> 
>>> Thanks,
>>> 
>>> Rion
>>> 
>>> (cross-posted to users+dev for reach)
> 
> --
> Ken Krugler
> http://www.scaleunlimited.com 
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: Handling JSON Serialization without Kryo

2023-03-20 Thread Rion Williams
Hi Shammon,Unfortunately it’s a data stream job. I’ve been exploring a few options but haven’t found anything I’ve decided on yet. I’m currently looking at seeing if I can leverage some type of partial serialization to bind to the properties that I know the job will use and retain the rest as a JSON blob. I’ve also consider trying to store the fields as a large map of string-object pairs and translating thay into a string prior to writing to the sinks.Still accepting any/all ideas that I come across to see if I can handle this in an efficient, reasonable way.Thanks,RionOn Mar 20, 2023, at 8:40 PM, Shammon FY  wrote:Hi RionIs your job datastream or table/sql? If it is a table/sql job, and you can define all the fields in json you need, then you can directly use json format [1] to parse the data. You can also customize udf functions to parse json data into struct data, such as map, row and other types supported by flink[1] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/Best,Shammon FYOn Sun, Mar 19, 2023 at 7:44 AM Rion Williams  wrote:Hi all,

I’m reaching out today for some suggestions (and hopefully a solution) for a Flink job that I’m working on. The job itself reads JSON strings from a Kafka topic and reads those into JSONObjects (currently via Gson), which are then operated against, before ultimately being written out to Kafka again.

The problem here is that the shape of the data can vary wildly and dynamically. Some records may have properties unique to only that record, which makes defining a POJO difficult. In addition to this, the JSONObjects fall by to Kryo serialization which is leading to atrocious throughput.

I basically need to read in JSON strings, enrich properties on these objects, and ultimately write them to various sinks.  Is there some type of JSON-based class or library or an approach I could use to accomplish this in an efficient manner? Or if possibly a way to partially write a POJO that would allow me to interact with sections/properties of the JSON while retaining other properties that might be dynamically present or unique to the message?

Any advice or suggestions would be welcome! I’ll also be happy to provide any additional context if it would help!

Thanks,

Rion

(cross-posted to users+dev for reach)


Re: Handling JSON Serialization without Kryo

2023-03-20 Thread Shammon FY
Hi Rion

Is your job datastream or table/sql? If it is a table/sql job, and you can
define all the fields in json you need, then you can directly use json
format [1] to parse the data.

You can also customize udf functions to parse json data into struct data,
such as map, row and other types supported by flink


[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/

Best,
Shammon FY


On Sun, Mar 19, 2023 at 7:44 AM Rion Williams  wrote:

> Hi all,
>
> I’m reaching out today for some suggestions (and hopefully a solution) for
> a Flink job that I’m working on. The job itself reads JSON strings from a
> Kafka topic and reads those into JSONObjects (currently via Gson), which
> are then operated against, before ultimately being written out to Kafka
> again.
>
> The problem here is that the shape of the data can vary wildly and
> dynamically. Some records may have properties unique to only that record,
> which makes defining a POJO difficult. In addition to this, the JSONObjects
> fall by to Kryo serialization which is leading to atrocious throughput.
>
> I basically need to read in JSON strings, enrich properties on these
> objects, and ultimately write them to various sinks.  Is there some type of
> JSON-based class or library or an approach I could use to accomplish this
> in an efficient manner? Or if possibly a way to partially write a POJO that
> would allow me to interact with sections/properties of the JSON while
> retaining other properties that might be dynamically present or unique to
> the message?
>
> Any advice or suggestions would be welcome! I’ll also be happy to provide
> any additional context if it would help!
>
> Thanks,
>
> Rion
>
> (cross-posted to users+dev for reach)


Handling JSON Serialization without Kryo

2023-03-18 Thread Rion Williams
Hi all,

I’m reaching out today for some suggestions (and hopefully a solution) for a 
Flink job that I’m working on. The job itself reads JSON strings from a Kafka 
topic and reads those into JSONObjects (currently via Gson), which are then 
operated against, before ultimately being written out to Kafka again.

The problem here is that the shape of the data can vary wildly and dynamically. 
Some records may have properties unique to only that record, which makes 
defining a POJO difficult. In addition to this, the JSONObjects fall by to Kryo 
serialization which is leading to atrocious throughput.

I basically need to read in JSON strings, enrich properties on these objects, 
and ultimately write them to various sinks.  Is there some type of JSON-based 
class or library or an approach I could use to accomplish this in an efficient 
manner? Or if possibly a way to partially write a POJO that would allow me to 
interact with sections/properties of the JSON while retaining other properties 
that might be dynamically present or unique to the message?

Any advice or suggestions would be welcome! I’ll also be happy to provide any 
additional context if it would help!

Thanks,

Rion

(cross-posted to users+dev for reach)

Handling JSON Serialization without Kryo

2023-03-18 Thread Rion Williams
Hi all,

I’m reaching out today for some suggestions (and hopefully a solution) for a 
Flink job that I’m working on. The job itself reads JSON strings from a Kafka 
topic and reads those into JSONObjects (currently via Gson), which are then 
operated against, before ultimately being written out to Kafka again.

The problem here is that the shape of the data can vary wildly and dynamically. 
Some records may have properties unique to only that record, which makes 
defining a POJO difficult. In addition to this, the JSONObjects fall by to Kryo 
serialization which is leading to atrocious throughput.

I basically need to read in JSON strings, enrich properties on these objects, 
and ultimately write them to various sinks.  Is there some type of JSON-based 
class or library or an approach I could use to accomplish this in an efficient 
manner? Or if possibly a way to partially write a POJO that would allow me to 
interact with sections/properties of the JSON while retaining other properties 
that might be dynamically present or unique to the message?

Any advice or suggestions would be welcome! I’ll also be happy to provide any 
additional context if it would help!

Thanks,

Rion

(cross-posted to users+dev for reach)