Interesting, is there a quirk in Scala that using multiple lazy variables results possibly in eager initialization of some?
On Mon, Oct 9, 2017 at 4:37 PM, Kostas Kloudas <k.klou...@data-artisans.com> wrote: > Hi Colin, > > Are you initializing your counters from within the open() method of you > rich function? > In other words, are you calling > > counter = getRuntimeContext.getMetricGroup.counter(“my counter”) > > from within the open(). > > The counter interface is not serializable. So if you instantiate the > counters outside the open(), > when Flink tries to ship your code to the cluster, it cannot so you get > the exception. > > You can have a look at the docs for an example: > https://ci.apache.org/projects/flink/flink-docs-release-1.3/ > monitoring/metrics.html > > Thanks, > Kostas > > On Oct 7, 2017, at 11:34 PM, Colin Williams <colin.williams.seattle@gmail. > com> wrote: > > I've created a RichMapFunction in scala with multiple counters like: > > lazy val successCounter = getRuntimeContext.getMetricGro > up.counter("successfulParse") > lazy val failedCounter = getRuntimeContext.getMetricGro > up.counter("failedParse") > lazy val errorCounter = getRuntimeContext.getMetricGro > up.counter("errorParse") > > which I increment in the map function. While testing I noticed that I have > no issues with using a single counter. However with multiple counters I get > a serialization error using more than one counter. > > Does anyone know how I can use multiple counters from my RichMapFunction, > or what I'm doing wrong? > > [info] org.apache.flink.api.common.InvalidProgramException: The > implementation of the RichMapFunction is not serializable. The object > probably contains or references non serializable fields. > [info] at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleane > r.java:100) > [info] at org.apache.flink.streaming.api.environment.StreamExecutionEn > vironment.clean(StreamExecutionEnvironment.java:1548) > [info] at org.apache.flink.streaming.api.datastream.DataStream.clean(D > ataStream.java:183) > [info] at org.apache.flink.streaming.api.datastream.DataStream.map(Dat > aStream.java:527) > [info] at org.apache.flink.streaming.api.scala.DataStream.map(DataStre > am.scala:581) > [info] at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwra > pperTest.scala:27) > [info] at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwra > pperTest.scala:23) > [info] at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) > [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > [info] at org.scalatest.Transformer.apply(Transformer.scala:22) > [info] ... > [info] Cause: java.io.NotSerializableException: > org.apache.flink.metrics.SimpleCounter > [info] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j > ava:1184) > [info] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt > ream.java:1548) > [info] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea > m.java:1509) > [info] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS > tream.java:1432) > [info] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j > ava:1178) > [info] at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.ja > va:348) > [info] at org.apache.flink.util.InstantiationUtil.serializeObject(Inst > antiationUtil.java:315) > [info] at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleane > r.java:81) > [info] at org.apache.flink.streaming.api.environment.StreamExecutionEn > vironment.clean(StreamExecutionEnvironment.java:1548) > [info] at org.apache.flink.streaming.api.datastream.DataStream.clean(D > ataStream.java:183) > [info] ... > [info] - ParseResultUnwrapper.errorCounter.getCount should return 1L for > a Error -> ParseResult[LineProtocol] *** FAILED *** > [info] org.apache.flink.api.common.InvalidProgramException: The > implementation of the RichMapFunction is not serializable. The object > probably contains or references non serializable fields. > [info] at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleane > r.java:100) > [info] at org.apache.flink.streaming.api.environment.StreamExecutionEn > vironment.clean(StreamExecutionEnvironment.java:1548) > [info] at org.apache.flink.streaming.api.datastream.DataStream.clean(D > ataStream.java:183) > [info] at org.apache.flink.streaming.api.datastream.DataStream.map(Dat > aStream.java:527) > [info] at org.apache.flink.streaming.api.scala.DataStream.map(DataStre > am.scala:581) > [info] at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwra > pperTest.scala:37) > [info] at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwra > pperTest.scala:32) > [info] at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) > [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > [info] at org.scalatest.Transformer.apply(Transformer.scala:22) > [info] ... > [info] Cause: java.io.NotSerializableException: > org.apache.flink.metrics.SimpleCounter > [info] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j > ava:1184) > [info] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt > ream.java:1548) > [info] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea > m.java:1509) > [info] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS > tream.java:1432) > [info] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j > ava:1178) > [info] at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.ja > va:348) > [info] at org.apache.flink.util.InstantiationUtil.serializeObject(Inst > antiationUtil.java:315) > [info] at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleane > r.java:81) > [info] at org.apache.flink.streaming.api.environment.StreamExecutionEn > vironment.clean(StreamExecutionEnvironment.java:1548) > [info] at org.apache.flink.streaming.api.datastream.DataStream.clean(D > ataStream.java:183) > [info] ... > > >