Re: Clean shutdown of streaming job
Hi Ning, I don't think it is possible to pause a Kafka source upon taking a savepoint without making any changes to the implementation. I think your problem is that the Cassandra sink doesn't support exactly once guarantees when the Cassandra query isn't idempotent. If possible, the cleanest solution would be implementing a new or extending the existing Cassandra sink with the https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html interface, and setting your environment to exactly-once guarantee. -- Niels On Mon, Oct 22, 2018 at 1:26 AM Ning Shi wrote: > I'm implementing a streaming job that consumes events from Kafka and > writes results to Cassandra. The writes to Cassandra are not > idempotent. In preparation for planned maintenance events like Flink > version upgrade or job update, I'd like to be able to shutdown the job > cleanly. > > I understand that cancelling with savepoint is currently not an atomic > operation, meaning that there may be one or more events being processed > after the savepoint is taken. In order to prevent any writes going to > Cassandra after the savepoint is taken, I wonder if it's possible to > pause the Kafka stream before I take the savepoint. > > I've seen people suggest using a control stream and union it with the > Kafka stream to achieve this, but it doesn't really pause the Kafka's > consumer offset from advancing. I'm concerned that if I send the signal > to the control stream and start to drop messages from Kafka, the offsets > may still advance and the new offsets will be included in the > savepoint. As a result, recovering from the savepoint will cause data > loss. > > Is there anyway to cleanly shutdown a job or pause the Kafka source > prior to taking a savepoint? > > Thanks, > > -- > Ning >
Re: Manual trigger the window in fold operator or incremental aggregation
Sorry, I would not know that. I have worked with custom triggers, but haven't actually had to implement a custom window function yet. By looking at the interfaces I would not say that is possible. Niels On Wed, Oct 17, 2018 at 2:18 PM Ahmad Hassan wrote: > Hi Niels, > > Can we distinguish within apply function of 'RichWindowFunction' whether > it was called due to onElement trigger call or onProcessingtime trigger > call of a custom Trigger ? > > Thanks! > > On Wed, 17 Oct 2018 at 12:51, Niels van Kaam wrote: > >> Hi Zhen Li, >> >> You can control when a windowed stream emits data with "Triggers". See: >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#triggers >> >> Flink comes with a couple of default triggers, but you can also create >> your own by implementing >> https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.html >> . >> >> Note that this does not change the window, but just causes the >> windowedstream to emit intermediate results to the next operator. >> >> Does this answer your question? >> >> Cheers, >> Niels >> >> On Wed, Oct 17, 2018 at 12:34 PM zhen li wrote: >> >>> Hi all: >>> How can I trigger the window manually in fold operator or >>> incremental aggregation? For example, when some conditions is meet,althouth >>> the count window or time window is not reach >> >>
Re: Checkpointing when one of the sources has completed
Hi All, Thanks for the responses, the finished source explains my issue then. I can work around the problem by letting my sources negotiate a "final" checkpoint via zookeeper. @Paul, I think your answer was meant for the earlier question asked by Joshua? Cheers, Niels On Wed, Oct 17, 2018 at 11:15 AM Joshua Fan wrote: > Hi Niels, > > Probably not, an operator begins to do checkpoint until it gets all the > barriers from all the upstream sources, if one source can not send a > barrier, the downstream operator can not do checkpoint, FYI. > > Yours sincerely > Joshua > > On Wed, Oct 17, 2018 at 4:58 PM Niels van Kaam wrote: > >> Hi All, >> >> I am debugging an issue where the periodic checkpointing has halted. I >> noticed that one of the sources of my job has completed (finished). The >> other sources and operators would however still be able to produce output. >> >> Does anyone know if Flink's periodic checkpoints are supposed to continue >> when one or more sources of a job are in the "FINISHED" state? >> >> Cheers, >> Niels >> >>
Re: Manual trigger the window in fold operator or incremental aggregation
Hi Zhen Li, You can control when a windowed stream emits data with "Triggers". See: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#triggers Flink comes with a couple of default triggers, but you can also create your own by implementing https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.html . Note that this does not change the window, but just causes the windowedstream to emit intermediate results to the next operator. Does this answer your question? Cheers, Niels On Wed, Oct 17, 2018 at 12:34 PM zhen li wrote: > Hi all: > How can I trigger the window manually in fold operator or incremental > aggregation? For example, when some conditions is meet,althouth the count > window or time window is not reach
Checkpointing when one of the sources has completed
Hi All, I am debugging an issue where the periodic checkpointing has halted. I noticed that one of the sources of my job has completed (finished). The other sources and operators would however still be able to produce output. Does anyone know if Flink's periodic checkpoints are supposed to continue when one or more sources of a job are in the "FINISHED" state? Cheers, Niels
Re: Akka Http used in custom RichSourceFunction
Hi, It was indeed the problem, and shading my akka dependency has solved the problem. Thank you for pointing that out! For references: When shading akka you also need to merge the reference.conf files from akka, or it will fail. This page contains useful documentation on how to shade akka: https://doc.akka.io/docs/akka/2.5.12/general/configuration.html#When_using_JarJar__OneJar__Assembly_or_any_jar-bundler Example POM: https://github.com/nvankaam/websocketclient Niels On Fri, May 25, 2018 at 11:00 AM Piotr Nowojski <pi...@data-artisans.com> wrote: > Hi, > > Yes, this might be the cause of the issue, because indeed it looks like > your akka’s version is leaking to Flink’s classloader. > > Piotrek > > > On 25 May 2018, at 09:40, Niels van Kaam <ni...@vankaam.net> wrote: > > Hi Piotrek, > > Thank you for your response! > > I am currently just testing the job in a local environment. I think that > means all classes are in the Java classpath, which might also be the issue > then. > If I am correct that means I am currently not using dynamic classloading > and just overwriting the Akka version, also for Flink. > > I will try moving my websocket connector to a seperate package and shade > it's Akka dependency. > > Code that starts the job: > https://github.com/nvankaam/flinkwebsocketsource/blob/master/src/main/scala/net/vankaam/flink/WebSocketSample.scala > > Dependencies: > https://github.com/nvankaam/flinkwebsocketsource/blob/master/build.sbt > > The full stack trace of the exception (I think this is the shutdown of the > Flink minicluster): > > Exception in thread "main" java.lang.NoSuchMethodError: > akka.actor.ActorSystem.shutdown()V > at > org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5$$anonfun$apply$7.apply(FlinkMiniCluster.scala:483) > at > org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5$$anonfun$apply$7.apply(FlinkMiniCluster.scala:483) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5.apply(FlinkMiniCluster.scala:483) > at > org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5.apply(FlinkMiniCluster.scala:483) > at scala.Option.foreach(Option.scala:257) > at > org.apache.flink.runtime.minicluster.FlinkMiniCluster.startInternalShutdown(FlinkMiniCluster.scala:482) > at > org.apache.flink.runtime.minicluster.FlinkMiniCluster.stop(FlinkMiniCluster.scala:434) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:112) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629) > at net.vankaam.flink.WebSocketSample$.main(WebSocketSample.scala:42) > at net.vankaam.flink.WebSocketSample.main(WebSocketSample.scala) > > > Cheers, > Niels > > > On Thu, May 24, 2018 at 4:08 PM Piotr Nowojski <pi...@data-artisans.com> > wrote: > >> Hi, >> >> Please take a look on >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/debugging_classloading.html >> . >> Especially check if you are using child-first class loading config. If it >> doesn’t help, probably you should shade your akka dependency. >> >> What is the full exception? Is it thrown when YOURS code tries to >> shutdown, or when FLINK’s code tries to shutdown? >> >> Piotrek >> >> >> On 24 May 2018, at 14:38, Niels van Kaam <ni...@vankaam.net> wrote: >> >> Hi All, >> >> I wrote a custom source function (RichSourceFunction) which connects to a >> web socket using the Akka Http Library. The job using this source runs fine >> on a local environment until upon shutdown I see the following error in the >> log: "Exception in thread "main" java.lang.NoSuchMethodError: >> akka.actor.ActorSystem.shutdown()V" >> >> My impression is the issue is caused by a version conflict between >> flink's akka dependency and my own one (due to akka http). This seems to be >> related to this issue: https://issues.apache.org/jira/browse/FLINK-9240 >> >> Can I somehow avoid this conflict? >> If not, does this mean I should avoid using Akka (or at least other >> versions than Flink's) within my sources/sinks? >> Or can I safely catch and ignore the error? >> >> My dependencies are: >> Flink: 1.4.2 >> akka-actor: 2.5.12 >> akka-stream: 2.5.12 >> akka-http: 10.1.1 >> >> Thank you for your help! >> >> Cheers, >> Niels >> >> >> >> >
Re: Akka Http used in custom RichSourceFunction
Hi Piotrek, Thank you for your response! I am currently just testing the job in a local environment. I think that means all classes are in the Java classpath, which might also be the issue then. If I am correct that means I am currently not using dynamic classloading and just overwriting the Akka version, also for Flink. I will try moving my websocket connector to a seperate package and shade it's Akka dependency. Code that starts the job: https://github.com/nvankaam/flinkwebsocketsource/blob/master/src/main/scala/net/vankaam/flink/WebSocketSample.scala Dependencies: https://github.com/nvankaam/flinkwebsocketsource/blob/master/build.sbt The full stack trace of the exception (I think this is the shutdown of the Flink minicluster): Exception in thread "main" java.lang.NoSuchMethodError: akka.actor.ActorSystem.shutdown()V at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5$$anonfun$apply$7.apply(FlinkMiniCluster.scala:483) at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5$$anonfun$apply$7.apply(FlinkMiniCluster.scala:483) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5.apply(FlinkMiniCluster.scala:483) at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5.apply(FlinkMiniCluster.scala:483) at scala.Option.foreach(Option.scala:257) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.startInternalShutdown(FlinkMiniCluster.scala:482) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.stop(FlinkMiniCluster.scala:434) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:112) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629) at net.vankaam.flink.WebSocketSample$.main(WebSocketSample.scala:42) at net.vankaam.flink.WebSocketSample.main(WebSocketSample.scala) Cheers, Niels On Thu, May 24, 2018 at 4:08 PM Piotr Nowojski <pi...@data-artisans.com> wrote: > Hi, > > Please take a look on > https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/debugging_classloading.html > . > Especially check if you are using child-first class loading config. If it > doesn’t help, probably you should shade your akka dependency. > > What is the full exception? Is it thrown when YOURS code tries to > shutdown, or when FLINK’s code tries to shutdown? > > Piotrek > > > On 24 May 2018, at 14:38, Niels van Kaam <ni...@vankaam.net> wrote: > > Hi All, > > I wrote a custom source function (RichSourceFunction) which connects to a > web socket using the Akka Http Library. The job using this source runs fine > on a local environment until upon shutdown I see the following error in the > log: "Exception in thread "main" java.lang.NoSuchMethodError: > akka.actor.ActorSystem.shutdown()V" > > My impression is the issue is caused by a version conflict between flink's > akka dependency and my own one (due to akka http). This seems to be related > to this issue: https://issues.apache.org/jira/browse/FLINK-9240 > > Can I somehow avoid this conflict? > If not, does this mean I should avoid using Akka (or at least other > versions than Flink's) within my sources/sinks? > Or can I safely catch and ignore the error? > > My dependencies are: > Flink: 1.4.2 > akka-actor: 2.5.12 > akka-stream: 2.5.12 > akka-http: 10.1.1 > > Thank you for your help! > > Cheers, > Niels > > > >
Akka Http used in custom RichSourceFunction
Hi All, I wrote a custom source function (RichSourceFunction) which connects to a web socket using the Akka Http Library. The job using this source runs fine on a local environment until upon shutdown I see the following error in the log: "Exception in thread "main" java.lang.NoSuchMethodError: akka.actor.ActorSystem.shutdown()V" My impression is the issue is caused by a version conflict between flink's akka dependency and my own one (due to akka http). This seems to be related to this issue: https://issues.apache.org/jira/browse/FLINK-9240 Can I somehow avoid this conflict? If not, does this mean I should avoid using Akka (or at least other versions than Flink's) within my sources/sinks? Or can I safely catch and ignore the error? My dependencies are: Flink: 1.4.2 akka-actor: 2.5.12 akka-stream: 2.5.12 akka-http: 10.1.1 Thank you for your help! Cheers, Niels
Re: "Close()" aborts last transaction in TwoPhaseCommitSinkFunction
Thank you! I already have a custom source function so adding the hacky solution would not be too much effort. Looking forward to the "proper" solution! Niels On Fri, Mar 9, 2018, 16:00 Piotr Nowojski <pi...@data-artisans.com> wrote: > Hi, > > Short answer is: no, at the moment clean shutdown is not implemented for > the streaming, but it’s on our to do list for the future. > > Hacky answer: you could implement some custom code, that would wait for at > least one completed checkpoint after the last input data. But that would > require modifying a source function or at least wrapping it and there might > be some corner cases that I haven’t thought about. > > Piotrek > > > On 9 Mar 2018, at 14:49, Niels van Kaam <ni...@vankaam.net> wrote: > > Hi, > > I'm working on a custom implementation of a sink which I would like to use > with exactly once semantics. Therefore I have implemented the > TwoPhaseCommitSinkFunction class as mentioned in this recent post: > https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html > > I have some integration tests which run jobs using the custom sink with a > finite dataset (A RichSourceFunction with a "finite" run method). The tests > fail because of missing data. I noticed that is due to the last transaction > being aborted. > > When looking into the source code that makes sense because the close() > implementation of TwoPhaseCommitSinkFunction calls abort on the current > transaction: > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java > > > I could override this behaviour and perform a commit, but then I would > perform a commit without getting the checkpoint completed notification, > thus not properly maintaining exactly once guarantees > > Is (and how is) it possible to have end-to-end exactly once guarantees > when dealing with (sometimes) finite jobs? > > Thanks! > Niels > > >
"Close()" aborts last transaction in TwoPhaseCommitSinkFunction
Hi, I'm working on a custom implementation of a sink which I would like to use with exactly once semantics. Therefore I have implemented the TwoPhaseCommitSinkFunction class as mentioned in this recent post: https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html I have some integration tests which run jobs using the custom sink with a finite dataset (A RichSourceFunction with a "finite" run method). The tests fail because of missing data. I noticed that is due to the last transaction being aborted. When looking into the source code that makes sense because the close() implementation of TwoPhaseCommitSinkFunction calls abort on the current transaction: https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java I could override this behaviour and perform a commit, but then I would perform a commit without getting the checkpoint completed notification, thus not properly maintaining exactly once guarantees Is (and how is) it possible to have end-to-end exactly once guarantees when dealing with (sometimes) finite jobs? Thanks! Niels