Re: [DISCUSS] Drop Scala 2.11
That makes sense. We are using 2.12 for our production Also, for flink scala 2.12 support, it is in fact limited to scala 2.12.7. It is binary incompatible with version 2.12 above ( https://issues.apache.org/jira/browse/FLINK-12461 ) That would be great to at least move to a more recent 2.12 version, and ideally to 2.13. Is there any scala support plan available? Matthieu On Thu, Sep 10, 2020 at 5:00 PM Aljoscha Krettek wrote: > Yes! I would be in favour of this since it's blocking us from upgrading > certain dependencies. > > I would also be in favour of dropping Scala completely but that's a > different story. > > Aljoscha > > On 10.09.20 16:51, Seth Wiesman wrote: > > Hi Everyone, > > > > Think of this as a pre-flip, but what does everyone think about dropping > > Scala 2.11 support from Flink. > > > > The last patch release was in 2017 and in that time the scala community > has > > released 2.13 and is working towards a 3.0 release. Apache Kafka and > Spark > > have both dropped 2.11 support in recent versions. In fact, Flink's > > universal Kafka connector is stuck on 2.4 because that is the last > version > > with scala 2.11 support. > > > > What are people's thoughts on dropping Scala 2.11? How many are still > using > > it in production? > > > > Seth > > > > -- Matthieu Bonneviot Senior RD Engineer, DataDome M +33 7 68 29 79 34 <+33+7+68+29+79+34> E matthieu.bonnev...@datadome.co W www.datadome.co <http://www.datadome.co?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature> [image: facebook] <https://www.facebook.com/datadome/?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature> [image: linkedin] <https://fr.linkedin.com/company/datadome?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature> [image: twitter] <https://twitter.com/data_dome?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature>
flink runtime incompatibility with java 9 or above due to akka version
Hi i am working on migrating my flink cluster to 1.7.1 with java 11. I am facing a runtime error in the taskmanager: 2019-01-24 14:43:37,014 ERROR akka.remote.Remoting - class [B cannot be cast to class [C ([B and [C are in module java.base of loader 'bootstrap') java.lang.ClassCastException: class [B cannot be cast to class [C ([B and [C are in module java.base of loader 'bootstrap') at akka.remote.artery.FastHash$.ofString(LruBoundedCache.scala:18) at akka.remote.serialization.ActorRefResolveCache.hash(ActorRefResolveCache.scala:61) This error is due to a code optimization done in akka which is not working anymore from java 9. I have open an issue for more details: https://issues.apache.org/jira/browse/FLINK-11431 Are you facing the same issue? Regards Matthieu Bonneviot -- Matthieu Bonneviot Senior Engineer, DataDome M +33 7 68 29 79 34 <+33+7+68+29+79+34> E matthieu.bonnev...@datadome.co W www.datadome.co <http://www.datadome.co?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature> <https://www.facebook.com/datadome/?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature> <https://fr.linkedin.com/company/datadome?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature> <https://twitter.com/data_dome?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature> <https://datadome.co/forrester-strong-performer-2018/?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature> DataDome ranked 'Strong Performer' in latest Forrester Bot management report <https://datadome.co/forrester-strong-performer-2018/?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature>
[jira] [Created] (FLINK-11431) Akka dependency not compatible with java 9 or above
Matthieu Bonneviot created FLINK-11431: -- Summary: Akka dependency not compatible with java 9 or above Key: FLINK-11431 URL: https://issues.apache.org/jira/browse/FLINK-11431 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.7.1 Reporter: Matthieu Bonneviot 2019-01-24 14:43:52,059 ERROR akka.remote.Remoting - class [B cannot be cast to class [C ([B and [C are in module java.base of loader 'bootstrap') java.lang.ClassCastException: class [B cannot be cast to class [C ([B and [C are in module java.base of loader 'bootstrap') at akka.remote.artery.FastHash$.ofString(LruBoundedCache.scala:18) at akka.remote.serialization.ActorRefResolveCache.hash(ActorRefResolveCache.scala:61) at akka.remote.serialization.ActorRefResolveCache.hash(ActorRefResolveCache.scala:55) at akka.remote.artery.LruBoundedCache.getOrCompute(LruBoundedCache.scala:110) at akka.remote.RemoteActorRefProvider.resolveActorRef(RemoteActorRefProvider.scala:403) at akka.actor.SerializedActorRef.readResolve(ActorRef.scala:433) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1250) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2096) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1594)Running a jobmanager with java 11 fail with the following call stack: Flink master is using akka 2.4.20. After some investigation, the error in akka comes from the following line: def ofString(s: String): Int = { val chars = Unsafe.instance.getObject(s, EnvelopeBuffer.StringValueFieldOffset).asInstanceOf[Array[Char]] from java 9 it is now an array of byte. The akka code in the newer version is: public static int fastHash(String str) { ... if (isJavaVersion9Plus) { final byte[] chars = (byte[]) instance.getObject(str, stringValueFieldOffset); ... } else { final char[] chars = (char[]) instance.getObject(str, stringValueFieldOffset); -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Re: issue in the MetricReporterRegistry
Yes indeed, it affects the behavior on java 11. I have created a bug in jira about it: Summary: MetricReporter: "metrics.reporters" configuration has to be provided for reporters to be taken into account Key: FLINK-11413 URL: https://issues.apache.org/jira/browse/FLINK-11413 Project: Flink Issue Type: Bug Components: Configuration Affects Versions: 1.7.1 I will have time to fix it and submit a PR. Regards Matthieu Bonneviot Le mer. 23 janv. 2019 à 10:41, Chesnay Schepler a écrit : > nvm, it does indeed affect behavior :/ > > On 23.01.2019 10:08, Chesnay Schepler wrote: > > Just to make sure, this issue does not actually affect the behavior, > > does it? Since we only use these as a filter for reporters to activate. > > > > On 21.01.2019 18:22, Matthieu Bonneviot wrote: > >> Hi > >> > >> I don't have the jira permission but If you grant me the permission I > >> could > >> contribute to fix the following issue: > >> When using java 11, "metrics.reporters" configuration has to be provided > >> for reporters to be taken into account. > >> > >> The desired behavior: > >> The MetricRegistryConfiguration looks for a conf like > >> "metrics.reporters = > >> foo,bar", if not found: all reporters that could be found in the > >> configuration will be started. > >> > >> In the code is it done by > >> Set includedReporters = > >> > reporterListPattern.splitAsStream(includedReportersString).collect(Collectors.toSet()); > > >> > >> > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java#L134 > >> > >> > >> Definition of splitAsStream: If this pattern does not match any > >> subsequence > >> of the input then the resulting stream has just one element, namely the > >> input sequence in string form. > >> It means reporterListPattern.splitAsStream("") should return "" and so > >> includedReporters should have size 1 with "" as unique element > >> > >> However there is a misbehavior in some version of java 8, it does return > >> empty stream. > >> But working with java 11, the further code does not work: if > >> (includedReporters.isEmpty() || > >> includedReporters.contains(reporterName)) > >> > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java#L145 > >> > >> > >> I would suggest to filter empty string: > >> Set includedReporters = > >> reporterListPattern.splitAsStream(includedReportersString).*filter(s -> > >> !s.isEmpty())*.collect(Collectors.toSet()); > >> > >> Regards > >> Matthieu Bonneviot > > > > > > > > -- Matthieu Bonneviot Senior Engineer, DataDome M +33 7 68 29 79 34 <+33+7+68+29+79+34> E matthieu.bonnev...@datadome.co W www.datadome.co <http://www.datadome.co?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature> <https://www.facebook.com/datadome/?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature> <https://fr.linkedin.com/company/datadome?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature> <https://twitter.com/data_dome?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature> <https://datadome.co/forrester-strong-performer-2018/?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature> DataDome ranked 'Strong Performer' in latest Forrester Bot management report <https://datadome.co/forrester-strong-performer-2018/?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature>
Re: Applying for Flink contributor permission
Thanks a lot Le mer. 23 janv. 2019 à 10:06, Robert Metzger a écrit : > Hey Matthieu, > > welcome to the Flink community! > I've added you as a contributor to our JIRA! Happy coding :) > > > > On Wed, Jan 23, 2019 at 9:39 AM Matthieu Bonneviot < > matthieu.bonnev...@datadome.co> wrote: > > > Hi > > > > Please provide me contribution permission. > > email: matthieu.bonnev...@datadome.co > > apache-username: mbonneviot > > > > Thank you > > > -- Matthieu Bonneviot Senior Engineer, DataDome M +33 7 68 29 79 34 <+33+7+68+29+79+34> E matthieu.bonnev...@datadome.co W www.datadome.co <http://www.datadome.co?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature> <https://www.facebook.com/datadome/?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature> <https://fr.linkedin.com/company/datadome?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature> <https://twitter.com/data_dome?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature> <https://datadome.co/forrester-strong-performer-2018/?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature> DataDome ranked 'Strong Performer' in latest Forrester Bot management report <https://datadome.co/forrester-strong-performer-2018/?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature>
[jira] [Created] (FLINK-11413) MetricReporter: "metrics.reporters" configuration has to be provided for reporters to be taken into account
Matthieu Bonneviot created FLINK-11413: -- Summary: MetricReporter: "metrics.reporters" configuration has to be provided for reporters to be taken into account Key: FLINK-11413 URL: https://issues.apache.org/jira/browse/FLINK-11413 Project: Flink Issue Type: Bug Components: Configuration Affects Versions: 1.7.1 Reporter: Matthieu Bonneviot When using java 11, "metrics.reporters" configuration has to be provided for reporters to be taken into account. The desired behavior: The MetricRegistryConfiguration looks for a conf like "metrics.reporters = foo,bar", if not found: all reporters that could be found in the configuration will be started. In the code is it done bySet includedReporters = reporterListPattern.splitAsStream(includedReportersString).collect(Collectors.toSet()); [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java#L134] Definition of splitAsStream: If this pattern does not match any subsequence of the input then the resulting stream has just one element, namely the input sequence in string form. It means reporterListPattern.splitAsStream("") should return "" and so includedReporters should have size 1 with "" as unique element However there is a misbehavior in some version of java 8, it does return empty stream. But working with java 11, the further code does not work: if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java#L145] I would suggest to filter empty string: Set includedReporters = reporterListPattern.splitAsStream(includedReportersString).*filter(s -> !s.isEmpty())*.collect(Collectors.toSet()); -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Applying for Flink contributor permission
Hi Please provide me contribution permission. email: matthieu.bonnev...@datadome.co apache-username: mbonneviot Thank you
issue in the MetricReporterRegistry
Hi I don't have the jira permission but If you grant me the permission I could contribute to fix the following issue: When using java 11, "metrics.reporters" configuration has to be provided for reporters to be taken into account. The desired behavior: The MetricRegistryConfiguration looks for a conf like "metrics.reporters = foo,bar", if not found: all reporters that could be found in the configuration will be started. In the code is it done by Set includedReporters = reporterListPattern.splitAsStream(includedReportersString).collect(Collectors.toSet()); https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java#L134 Definition of splitAsStream: If this pattern does not match any subsequence of the input then the resulting stream has just one element, namely the input sequence in string form. It means reporterListPattern.splitAsStream("") should return "" and so includedReporters should have size 1 with "" as unique element However there is a misbehavior in some version of java 8, it does return empty stream. But working with java 11, the further code does not work: if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java#L145 I would suggest to filter empty string: Set includedReporters = reporterListPattern.splitAsStream(includedReportersString).*filter(s -> !s.isEmpty())*.collect(Collectors.toSet()); Regards Matthieu Bonneviot -- Matthieu Bonneviot Senior Engineer, DataDome M +33 7 68 29 79 34 <+33+7+68+29+79+34> E matthieu.bonnev...@datadome.co W www.datadome.co <http://www.datadome.co?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature> <https://www.facebook.com/datadome/?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature> <https://fr.linkedin.com/company/datadome?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature> <https://twitter.com/data_dome?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature> <https://datadome.co/forrester-strong-performer-2018/?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature> DataDome ranked 'Strong Performer' in latest Forrester Bot management report <https://datadome.co/forrester-strong-performer-2018/?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature>
Re: Enrich testing doc with more unit test examples using AbstractStreamOperator
Hi I am a new flink developer and I also feel we need to improve the testing side of flink. >From Tony list: 1-stream logic control: we may consider reactiveX as a reference for stream testing: http://reactivex.io/. They have a test scheduler and use marble diagram to write unit test: for example '--a--b--|' represent event a arriving at slot time 3 and event b arriving at slot time 6 and an end of stream at slot time 9. The same diagram can be use for the expected events at the output of our stream. It is very efficient. 3-state testing: I also feel there is a lack of checkpoint testing. On my side, checkpoint is mostly observe at runtime. I would also suggest on a design and testing point of view to use less reflection and have more interfaces. It is easier to test, to mock... Quick description of the use case I had: We run flink to stream web event, more that 1 billion a day. Those events are transformed by flink and push to kafka. We are using FlinkKafkaProducer010 to push in kafka. >From time to time, the kafka leader node goes down and so it generate an exception in flink which stop the stream. We don't want that behavior so I implemented a decorator to catch the exception and recreate the producer. On a design and testing point of view, addSink needs a “SinkFunction[T]”. It means I should decorate the SinkFunction interface. I wrote the code and I wrote the test. Code and unit tests run ok until I went to runtime and discovered nothing was push to kafka. The interface does not describe the behavior and so the test mocking a SinkFunction generating an exception. The new code for the decorator and the test are much more complicated as it is not using interface but several class extension. Regards Matthieu Bonneviot Le lun. 8 oct. 2018 à 03:47, Tony Wei a écrit : > > Hi all, > > Does anyone want to share something about how to provide more better > testing tools or good documentations? > > @Till, @Aljoscha > Do you have further suggestions about the improvements of testing tools and > what is the next step we can do? > > Best, > Tony Wei > > > Tony Wei 於 2018年9月29日 週六 上午12:20寫道: > > > Hi Till, > > > > Thanks for your feedback. I didn't think about this before. It will be > > better if we can provide such tools instead of > > let user deal with testing in operator level directly. Because test > > harness was introduced for contributors, who > > may have more knowledge about internal design, to test their patches > > easily, it sometimes may not be intuitive > > to users to use test harness. And that's why I want to provide some > > examples in docs to ease the pain from such > > problem in the beginning. > > > > If Aljoscha has already made some efforts on it and let such tool be > > available in the near future, I will look forward > > to seeing this happen. And if there is anything I can help, I'm glad to > > give a hand. > > > > To follow up the discussion about users' requirements, I can provide some > > from my experience. What I want to > > achieve is to do unit testing on my operators with Flink features, such as > > states, event time and checkpoint. > > > > From the testing doc [1] I provided, I concluded some scenarios I would > > test in my flink application: > > > >1. Easily control stream records to see the changes about the states > >and outputs, no matter it is one input > >stream operator or two. > >2. Manually control watermark progress to test "out of order" > >problem, "late data" problem or the behavior of > >"event time service". > >3. Preserve states from every version for testing states evolution and > >compatibility or just to test the change > >of Flink version. > >4. Test customized stateful source function. In current version, > >source function still need to be implemented > >by a long running method. How to easily block it and verify the states > >is helpful. > >5. A simple way to expose states to verify the exactly value stored in > >states, instead of testing it indirectly by > >outputs. I archived this by getting state backend from test harness > >and refer to some examples from Flink > >project to access the keyed states I needed, but it is too deeper into > >the internal implementations to make > >it as an example in my testing doc [1]. > > > > Best, > > Tony Wei > > > > [1] > > https://github.com/apache/flink/compare/master...tony810430:flink-testing-doc > > > > 2018-09-28 21:48 GMT+08:00 Till Rohrmann : > > > >> Hi Tony, > >> > >> I think this is a lo