Interesting, is there a quirk in Scala that using multiple lazy variables
results possibly in eager initialization of some?

On Mon, Oct 9, 2017 at 4:37 PM, Kostas Kloudas <k.klou...@data-artisans.com>
wrote:

> Hi Colin,
>
> Are you initializing your counters from within the open() method of you
> rich function?
> In other words, are you calling
>
> counter = getRuntimeContext.getMetricGroup.counter(“my counter”)
>
> from within the open().
>
> The counter interface is not serializable. So if you instantiate the
> counters outside the open(),
> when Flink tries to ship your code to the cluster, it cannot so you get
> the exception.
>
> You can have a look at the docs for an example:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
> monitoring/metrics.html
>
> Thanks,
> Kostas
>
> On Oct 7, 2017, at 11:34 PM, Colin Williams <colin.williams.seattle@gmail.
> com> wrote:
>
> I've created a RichMapFunction in scala with multiple counters like:
>
>    lazy val successCounter = getRuntimeContext.getMetricGro
> up.counter("successfulParse")
>    lazy val failedCounter = getRuntimeContext.getMetricGro
> up.counter("failedParse")
>    lazy val errorCounter = getRuntimeContext.getMetricGro
> up.counter("errorParse")
>
> which I increment in the map function. While testing I noticed that I have
> no issues with using a single counter. However with multiple counters I get
> a serialization error using more than one counter.
>
> Does anyone know how I can use multiple counters from my RichMapFunction,
> or what I'm doing wrong?
>
> [info]   org.apache.flink.api.common.InvalidProgramException: The
> implementation of the RichMapFunction is not serializable. The object
> probably contains or references non serializable fields.
> [info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleane
> r.java:100)
> [info]   at org.apache.flink.streaming.api.environment.StreamExecutionEn
> vironment.clean(StreamExecutionEnvironment.java:1548)
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(D
> ataStream.java:183)
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.map(Dat
> aStream.java:527)
> [info]   at org.apache.flink.streaming.api.scala.DataStream.map(DataStre
> am.scala:581)
> [info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwra
> pperTest.scala:27)
> [info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwra
> pperTest.scala:23)
> [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info]   ...
> [info]   Cause: java.io.NotSerializableException:
> org.apache.flink.metrics.SimpleCounter
> [info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j
> ava:1184)
> [info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
> ream.java:1548)
> [info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea
> m.java:1509)
> [info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS
> tream.java:1432)
> [info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j
> ava:1178)
> [info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.ja
> va:348)
> [info]   at org.apache.flink.util.InstantiationUtil.serializeObject(Inst
> antiationUtil.java:315)
> [info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleane
> r.java:81)
> [info]   at org.apache.flink.streaming.api.environment.StreamExecutionEn
> vironment.clean(StreamExecutionEnvironment.java:1548)
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(D
> ataStream.java:183)
> [info]   ...
> [info] - ParseResultUnwrapper.errorCounter.getCount should return 1L for
> a Error -> ParseResult[LineProtocol] *** FAILED ***
> [info]   org.apache.flink.api.common.InvalidProgramException: The
> implementation of the RichMapFunction is not serializable. The object
> probably contains or references non serializable fields.
> [info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleane
> r.java:100)
> [info]   at org.apache.flink.streaming.api.environment.StreamExecutionEn
> vironment.clean(StreamExecutionEnvironment.java:1548)
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(D
> ataStream.java:183)
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.map(Dat
> aStream.java:527)
> [info]   at org.apache.flink.streaming.api.scala.DataStream.map(DataStre
> am.scala:581)
> [info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwra
> pperTest.scala:37)
> [info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwra
> pperTest.scala:32)
> [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info]   ...
> [info]   Cause: java.io.NotSerializableException:
> org.apache.flink.metrics.SimpleCounter
> [info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j
> ava:1184)
> [info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
> ream.java:1548)
> [info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea
> m.java:1509)
> [info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS
> tream.java:1432)
> [info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j
> ava:1178)
> [info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.ja
> va:348)
> [info]   at org.apache.flink.util.InstantiationUtil.serializeObject(Inst
> antiationUtil.java:315)
> [info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleane
> r.java:81)
> [info]   at org.apache.flink.streaming.api.environment.StreamExecutionEn
> vironment.clean(StreamExecutionEnvironment.java:1548)
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(D
> ataStream.java:183)
> [info]   ...
>
>
>

Reply via email to