Re: json mapper
I think your snippet looks good. The Jackson ObjectMapper is designed to be reused by numerous threads, and routinely stored as a static field. It is somewhat expensive to create. Hope this helps, -Eron On Thu, Aug 3, 2017 at 7:46 AM, Nico Kruber wrote: > Hi Peter, > I'm no Scala developer but I may be able to help with some concepts: > > * a static reference used inside a [Map]Function will certainly cause > problems > when executed in parallel in the same JVM, e.g. a TaskManager with multiple > slots, depending on whether this static object is stateful and/or > thread-safe > * additionally, not all parallel instances of your map may be executed in > the > same JVM, e.g. on multiple TaskManagers, so you cannot assume that the > state > of the JsonMapper is consistent among them > * if the ObjectMapper does not store any state that is worth recovering > during > a failure (none that I see from https://fasterxml.github.io/ > jackson-databind/ > javadoc/2.3.0/com/fasterxml/jackson/databind/ObjectMapper.html if that is > the > one you are using), then you don't need to put it into flink state but can > either initialise it as a (non-static) member of your MapFunction class or > even in your map function itself > * for the correct use of keyed/non-keyed state, please refer to my other > email > or [1] > * for 'class' vs. 'object': if you're using > com.fasterxml.jackson.databind.ObjectMapper as described above, you'll > have > state again ("It will use instances of JsonParser and JsonGenerator for > implementing actual reading/writing of JSON. " from the docs) but in > general, > it is a good question whether the singleton would work for stateless > operators > and whether it actually improves performance. > > > Nico > > [1] https://ci.apache.org/projects/flink/flink-docs- > release-1.3/dev/stream/ > state.html > > On Thursday, 3 August 2017 12:41:36 CEST Peter Ertl wrote: > > Hi flink users, > > > > I just wanted to ask if this kind of scala map function is correct? > > > > object JsonMapper { > > private val mapper: ObjectMapper = new ObjectMapper() > > } > > > > class JsonMapper extends MapFunction[String, ObjectNode] { > > override def map(value: String): ObjectNode = > > JsonMapper.mapper.readValue(value, classOf[ObjectNode]) } > > > > Is using a static reference to ObjectMapper fine or will this cause > issues > > on a distributed cluster / with checkpoint / serializing state / > whatever ? > > > > Or should I instead use a non-transient property initialized in ctor > > (ObjectMapper is java.io.Serializable) ? > > > > Or should I initialize it with RichMapFunction.open into a transient > > property? > > > > Also I am wondering if replacing 'class' with 'object' (=> singleton) > > > > object JsonMapper extends MapFunction[String, ObjectNode] { /* ..*/ } > > > > is ok (actually the mapper is stateless so no obvious need to > re-instantiate > > it again and again ? ) > > > > Thanks and best regards > > Peter > >
Re: json mapper
Hi Peter, I'm no Scala developer but I may be able to help with some concepts: * a static reference used inside a [Map]Function will certainly cause problems when executed in parallel in the same JVM, e.g. a TaskManager with multiple slots, depending on whether this static object is stateful and/or thread-safe * additionally, not all parallel instances of your map may be executed in the same JVM, e.g. on multiple TaskManagers, so you cannot assume that the state of the JsonMapper is consistent among them * if the ObjectMapper does not store any state that is worth recovering during a failure (none that I see from https://fasterxml.github.io/jackson-databind/ javadoc/2.3.0/com/fasterxml/jackson/databind/ObjectMapper.html if that is the one you are using), then you don't need to put it into flink state but can either initialise it as a (non-static) member of your MapFunction class or even in your map function itself * for the correct use of keyed/non-keyed state, please refer to my other email or [1] * for 'class' vs. 'object': if you're using com.fasterxml.jackson.databind.ObjectMapper as described above, you'll have state again ("It will use instances of JsonParser and JsonGenerator for implementing actual reading/writing of JSON. " from the docs) but in general, it is a good question whether the singleton would work for stateless operators and whether it actually improves performance. Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/ state.html On Thursday, 3 August 2017 12:41:36 CEST Peter Ertl wrote: > Hi flink users, > > I just wanted to ask if this kind of scala map function is correct? > > object JsonMapper { > private val mapper: ObjectMapper = new ObjectMapper() > } > > class JsonMapper extends MapFunction[String, ObjectNode] { > override def map(value: String): ObjectNode = > JsonMapper.mapper.readValue(value, classOf[ObjectNode]) } > > Is using a static reference to ObjectMapper fine or will this cause issues > on a distributed cluster / with checkpoint / serializing state / whatever ? > > Or should I instead use a non-transient property initialized in ctor > (ObjectMapper is java.io.Serializable) ? > > Or should I initialize it with RichMapFunction.open into a transient > property? > > Also I am wondering if replacing 'class' with 'object' (=> singleton) > > object JsonMapper extends MapFunction[String, ObjectNode] { /* ..*/ } > > is ok (actually the mapper is stateless so no obvious need to re-instantiate > it again and again ? ) > > Thanks and best regards > Peter signature.asc Description: This is a digitally signed message part.
json mapper
Hi flink users, I just wanted to ask if this kind of scala map function is correct? object JsonMapper { private val mapper: ObjectMapper = new ObjectMapper() } class JsonMapper extends MapFunction[String, ObjectNode] { override def map(value: String): ObjectNode = JsonMapper.mapper.readValue(value, classOf[ObjectNode]) } Is using a static reference to ObjectMapper fine or will this cause issues on a distributed cluster / with checkpoint / serializing state / whatever ? Or should I instead use a non-transient property initialized in ctor (ObjectMapper is java.io.Serializable) ? Or should I initialize it with RichMapFunction.open into a transient property? Also I am wondering if replacing 'class' with 'object' (=> singleton) object JsonMapper extends MapFunction[String, ObjectNode] { /* ..*/ } is ok (actually the mapper is stateless so no obvious need to re-instantiate it again and again ? ) Thanks and best regards Peter