I think your snippet looks good. The Jackson ObjectMapper is designed to be reused by numerous threads, and routinely stored as a static field. It is somewhat expensive to create.
Hope this helps, -Eron On Thu, Aug 3, 2017 at 7:46 AM, Nico Kruber <n...@data-artisans.com> wrote: > Hi Peter, > I'm no Scala developer but I may be able to help with some concepts: > > * a static reference used inside a [Map]Function will certainly cause > problems > when executed in parallel in the same JVM, e.g. a TaskManager with multiple > slots, depending on whether this static object is stateful and/or > thread-safe > * additionally, not all parallel instances of your map may be executed in > the > same JVM, e.g. on multiple TaskManagers, so you cannot assume that the > state > of the JsonMapper is consistent among them > * if the ObjectMapper does not store any state that is worth recovering > during > a failure (none that I see from https://fasterxml.github.io/ > jackson-databind/ > javadoc/2.3.0/com/fasterxml/jackson/databind/ObjectMapper.html if that is > the > one you are using), then you don't need to put it into flink state but can > either initialise it as a (non-static) member of your MapFunction class or > even in your map function itself > * for the correct use of keyed/non-keyed state, please refer to my other > email > or [1] > * for 'class' vs. 'object': if you're using > com.fasterxml.jackson.databind.ObjectMapper as described above, you'll > have > state again ("It will use instances of JsonParser and JsonGenerator for > implementing actual reading/writing of JSON. " from the docs) but in > general, > it is a good question whether the singleton would work for stateless > operators > and whether it actually improves performance. > > > Nico > > [1] https://ci.apache.org/projects/flink/flink-docs- > release-1.3/dev/stream/ > state.html > > On Thursday, 3 August 2017 12:41:36 CEST Peter Ertl wrote: > > Hi flink users, > > > > I just wanted to ask if this kind of scala map function is correct? > > > > object JsonMapper { > > private val mapper: ObjectMapper = new ObjectMapper() > > } > > > > class JsonMapper extends MapFunction[String, ObjectNode] { > > override def map(value: String): ObjectNode = > > JsonMapper.mapper.readValue(value, classOf[ObjectNode]) } > > > > Is using a static reference to ObjectMapper fine or will this cause > issues > > on a distributed cluster / with checkpoint / serializing state / > whatever ? > > > > Or should I instead use a non-transient property initialized in ctor > > (ObjectMapper is java.io.Serializable) ? > > > > Or should I initialize it with RichMapFunction.open into a transient > > property? > > > > Also I am wondering if replacing 'class' with 'object' (=> singleton) > > > > object JsonMapper extends MapFunction[String, ObjectNode] { /* ..*/ } > > > > is ok (actually the mapper is stateless so no obvious need to > re-instantiate > > it again and again ? ) > > > > Thanks and best regards > > Peter > >