Re: Clean shutdown of streaming job

2018-10-22 Thread Niels van Kaam
Hi Ning,

I don't think it is possible to pause a Kafka source upon taking a
savepoint without making any changes to the implementation.

I think your problem is that the Cassandra sink doesn't support exactly
once guarantees when the Cassandra query isn't idempotent. If possible, the
cleanest solution would be implementing a new or extending the existing
Cassandra sink with the
https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html
interface, and setting your environment to exactly-once guarantee.

--
Niels



On Mon, Oct 22, 2018 at 1:26 AM Ning Shi  wrote:

> I'm implementing a streaming job that consumes events from Kafka and
> writes results to Cassandra. The writes to Cassandra are not
> idempotent. In preparation for planned maintenance events like Flink
> version upgrade or job update, I'd like to be able to shutdown the job
> cleanly.
>
> I understand that cancelling with savepoint is currently not an atomic
> operation, meaning that there may be one or more events being processed
> after the savepoint is taken. In order to prevent any writes going to
> Cassandra after the savepoint is taken, I wonder if it's possible to
> pause the Kafka stream before I take the savepoint.
>
> I've seen people suggest using a control stream and union it with the
> Kafka stream to achieve this, but it doesn't really pause the Kafka's
> consumer offset from advancing. I'm concerned that if I send the signal
> to the control stream and start to drop messages from Kafka, the offsets
> may still advance and the new offsets will be included in the
> savepoint. As a result, recovering from the savepoint will cause data
> loss.
>
> Is there anyway to cleanly shutdown a job or pause the Kafka source
> prior to taking a savepoint?
>
> Thanks,
>
> --
> Ning
>


Re: Manual trigger the window in fold operator or incremental aggregation

2018-10-17 Thread Niels van Kaam
Sorry, I would not know that. I have worked with custom triggers, but
haven't actually had to implement a custom window function yet.

By looking at the interfaces I would not say that is possible.

Niels

On Wed, Oct 17, 2018 at 2:18 PM Ahmad Hassan  wrote:

> Hi Niels,
>
> Can we distinguish within apply function of 'RichWindowFunction' whether
> it was called due to onElement trigger call or onProcessingtime trigger
> call of a custom Trigger ?
>
> Thanks!
>
> On Wed, 17 Oct 2018 at 12:51, Niels van Kaam  wrote:
>
>> Hi Zhen Li,
>>
>> You can control when a windowed stream emits data with "Triggers". See:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#triggers
>>
>> Flink comes with a couple of default triggers, but you can also create
>> your own by implementing
>> https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.html
>> .
>>
>> Note that this does not change the window, but just causes the
>> windowedstream to emit intermediate results to the next operator.
>>
>> Does this answer your question?
>>
>> Cheers,
>> Niels
>>
>> On Wed, Oct 17, 2018 at 12:34 PM zhen li  wrote:
>>
>>> Hi all:
>>> How can I trigger the window manually in  fold operator or
>>> incremental aggregation? For example, when some conditions is meet,althouth
>>> the count window or time window is not reach
>>
>>


Re: Checkpointing when one of the sources has completed

2018-10-17 Thread Niels van Kaam
Hi All,

Thanks for the responses, the finished source explains my issue then. I can
work around the problem by letting my sources negotiate a "final"
checkpoint via zookeeper.

@Paul, I think your answer was meant for the earlier question asked by
Joshua?

Cheers,
Niels

On Wed, Oct 17, 2018 at 11:15 AM Joshua Fan  wrote:

> Hi Niels,
>
> Probably not, an operator begins to do checkpoint until it gets all the
> barriers from all the upstream sources, if one source can not send a
> barrier, the downstream operator can not do checkpoint, FYI.
>
> Yours sincerely
> Joshua
>
> On Wed, Oct 17, 2018 at 4:58 PM Niels van Kaam  wrote:
>
>> Hi All,
>>
>> I am debugging an issue where the periodic checkpointing has halted. I
>> noticed that one of the sources of my job has completed (finished). The
>> other sources and operators would however still be able to produce output.
>>
>> Does anyone know if Flink's periodic checkpoints are supposed to continue
>> when one or more sources of a job are in the "FINISHED" state?
>>
>> Cheers,
>> Niels
>>
>>


Re: Manual trigger the window in fold operator or incremental aggregation

2018-10-17 Thread Niels van Kaam
Hi Zhen Li,

You can control when a windowed stream emits data with "Triggers". See:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#triggers

Flink comes with a couple of default triggers, but you can also create your
own by implementing
https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.html
.

Note that this does not change the window, but just causes the
windowedstream to emit intermediate results to the next operator.

Does this answer your question?

Cheers,
Niels

On Wed, Oct 17, 2018 at 12:34 PM zhen li  wrote:

> Hi all:
> How can I trigger the window manually in  fold operator or incremental
> aggregation? For example, when some conditions is meet,althouth the count
> window or time window is not reach


Checkpointing when one of the sources has completed

2018-10-17 Thread Niels van Kaam
Hi All,

I am debugging an issue where the periodic checkpointing has halted. I
noticed that one of the sources of my job has completed (finished). The
other sources and operators would however still be able to produce output.

Does anyone know if Flink's periodic checkpoints are supposed to continue
when one or more sources of a job are in the "FINISHED" state?

Cheers,
Niels


Re: Akka Http used in custom RichSourceFunction

2018-05-25 Thread Niels van Kaam
Hi,

It was indeed the problem, and shading my akka dependency has solved the
problem. Thank you for pointing that out!

For references:

When shading akka you also need to merge the reference.conf files from
akka, or it will fail. This page contains useful documentation on how to
shade akka:
https://doc.akka.io/docs/akka/2.5.12/general/configuration.html#When_using_JarJar__OneJar__Assembly_or_any_jar-bundler

Example POM: https://github.com/nvankaam/websocketclient

Niels



On Fri, May 25, 2018 at 11:00 AM Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> Yes, this might be the cause of the issue, because indeed it looks like
> your akka’s version is leaking to Flink’s classloader.
>
> Piotrek
>
>
> On 25 May 2018, at 09:40, Niels van Kaam <ni...@vankaam.net> wrote:
>
> Hi Piotrek,
>
> Thank you for your response!
>
> I am currently just testing the job in a local environment. I think that
> means all classes are in the Java classpath, which might also be the issue
> then.
> If I am correct that means I am currently not using dynamic classloading
> and just overwriting the Akka version, also for Flink.
>
> I will try moving my websocket connector to a seperate package and shade
> it's Akka dependency.
>
> Code that starts the job:
> https://github.com/nvankaam/flinkwebsocketsource/blob/master/src/main/scala/net/vankaam/flink/WebSocketSample.scala
>
> Dependencies:
> https://github.com/nvankaam/flinkwebsocketsource/blob/master/build.sbt
>
> The full stack trace of the exception (I think this is the shutdown of the
> Flink minicluster):
>
> Exception in thread "main" java.lang.NoSuchMethodError:
> akka.actor.ActorSystem.shutdown()V
> at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5$$anonfun$apply$7.apply(FlinkMiniCluster.scala:483)
> at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5$$anonfun$apply$7.apply(FlinkMiniCluster.scala:483)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5.apply(FlinkMiniCluster.scala:483)
> at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5.apply(FlinkMiniCluster.scala:483)
> at scala.Option.foreach(Option.scala:257)
> at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.startInternalShutdown(FlinkMiniCluster.scala:482)
> at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.stop(FlinkMiniCluster.scala:434)
> at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:112)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501)
> at
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629)
> at net.vankaam.flink.WebSocketSample$.main(WebSocketSample.scala:42)
> at net.vankaam.flink.WebSocketSample.main(WebSocketSample.scala)
>
>
> Cheers,
> Niels
>
>
> On Thu, May 24, 2018 at 4:08 PM Piotr Nowojski <pi...@data-artisans.com>
> wrote:
>
>> Hi,
>>
>> Please take a look on
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/debugging_classloading.html
>>  .
>> Especially check if you are using child-first class loading config. If it
>> doesn’t help, probably you should shade your akka dependency.
>>
>> What is the full exception? Is it thrown when YOURS code tries to
>> shutdown, or when FLINK’s code tries to shutdown?
>>
>> Piotrek
>>
>>
>> On 24 May 2018, at 14:38, Niels van Kaam <ni...@vankaam.net> wrote:
>>
>> Hi All,
>>
>> I wrote a custom source function (RichSourceFunction) which connects to a
>> web socket using the Akka Http Library. The job using this source runs fine
>> on a local environment until upon shutdown I see the following error in the
>> log: "Exception in thread "main" java.lang.NoSuchMethodError:
>> akka.actor.ActorSystem.shutdown()V"
>>
>> My impression is the issue is caused by a version conflict between
>> flink's akka dependency and my own one (due to akka http). This seems to be
>> related to this issue: https://issues.apache.org/jira/browse/FLINK-9240
>>
>> Can I somehow avoid this conflict?
>> If not, does this mean I should avoid using Akka (or at least other
>> versions than Flink's) within my sources/sinks?
>> Or can I safely catch and ignore the error?
>>
>> My dependencies are:
>> Flink: 1.4.2
>> akka-actor: 2.5.12
>> akka-stream: 2.5.12
>> akka-http: 10.1.1
>>
>> Thank you for your help!
>>
>> Cheers,
>> Niels
>>
>>
>>
>>
>


Re: Akka Http used in custom RichSourceFunction

2018-05-25 Thread Niels van Kaam
Hi Piotrek,

Thank you for your response!

I am currently just testing the job in a local environment. I think that
means all classes are in the Java classpath, which might also be the issue
then.
If I am correct that means I am currently not using dynamic classloading
and just overwriting the Akka version, also for Flink.

I will try moving my websocket connector to a seperate package and shade
it's Akka dependency.

Code that starts the job:
https://github.com/nvankaam/flinkwebsocketsource/blob/master/src/main/scala/net/vankaam/flink/WebSocketSample.scala

Dependencies:
https://github.com/nvankaam/flinkwebsocketsource/blob/master/build.sbt

The full stack trace of the exception (I think this is the shutdown of the
Flink minicluster):

Exception in thread "main" java.lang.NoSuchMethodError:
akka.actor.ActorSystem.shutdown()V
at
org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5$$anonfun$apply$7.apply(FlinkMiniCluster.scala:483)
at
org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5$$anonfun$apply$7.apply(FlinkMiniCluster.scala:483)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at
org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5.apply(FlinkMiniCluster.scala:483)
at
org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5.apply(FlinkMiniCluster.scala:483)
at scala.Option.foreach(Option.scala:257)
at
org.apache.flink.runtime.minicluster.FlinkMiniCluster.startInternalShutdown(FlinkMiniCluster.scala:482)
at
org.apache.flink.runtime.minicluster.FlinkMiniCluster.stop(FlinkMiniCluster.scala:434)
at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:112)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501)
at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629)
at net.vankaam.flink.WebSocketSample$.main(WebSocketSample.scala:42)
at net.vankaam.flink.WebSocketSample.main(WebSocketSample.scala)


Cheers,
Niels


On Thu, May 24, 2018 at 4:08 PM Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> Please take a look on
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/debugging_classloading.html
>  .
> Especially check if you are using child-first class loading config. If it
> doesn’t help, probably you should shade your akka dependency.
>
> What is the full exception? Is it thrown when YOURS code tries to
> shutdown, or when FLINK’s code tries to shutdown?
>
> Piotrek
>
>
> On 24 May 2018, at 14:38, Niels van Kaam <ni...@vankaam.net> wrote:
>
> Hi All,
>
> I wrote a custom source function (RichSourceFunction) which connects to a
> web socket using the Akka Http Library. The job using this source runs fine
> on a local environment until upon shutdown I see the following error in the
> log: "Exception in thread "main" java.lang.NoSuchMethodError:
> akka.actor.ActorSystem.shutdown()V"
>
> My impression is the issue is caused by a version conflict between flink's
> akka dependency and my own one (due to akka http). This seems to be related
> to this issue: https://issues.apache.org/jira/browse/FLINK-9240
>
> Can I somehow avoid this conflict?
> If not, does this mean I should avoid using Akka (or at least other
> versions than Flink's) within my sources/sinks?
> Or can I safely catch and ignore the error?
>
> My dependencies are:
> Flink: 1.4.2
> akka-actor: 2.5.12
> akka-stream: 2.5.12
> akka-http: 10.1.1
>
> Thank you for your help!
>
> Cheers,
> Niels
>
>
>
>


Akka Http used in custom RichSourceFunction

2018-05-24 Thread Niels van Kaam
Hi All,

I wrote a custom source function (RichSourceFunction) which connects to a
web socket using the Akka Http Library. The job using this source runs fine
on a local environment until upon shutdown I see the following error in the
log: "Exception in thread "main" java.lang.NoSuchMethodError:
akka.actor.ActorSystem.shutdown()V"

My impression is the issue is caused by a version conflict between flink's
akka dependency and my own one (due to akka http). This seems to be related
to this issue: https://issues.apache.org/jira/browse/FLINK-9240

Can I somehow avoid this conflict?
If not, does this mean I should avoid using Akka (or at least other
versions than Flink's) within my sources/sinks?
Or can I safely catch and ignore the error?

My dependencies are:
Flink: 1.4.2
akka-actor: 2.5.12
akka-stream: 2.5.12
akka-http: 10.1.1

Thank you for your help!

Cheers,
Niels


Re: "Close()" aborts last transaction in TwoPhaseCommitSinkFunction

2018-03-09 Thread Niels van Kaam
Thank you! I already have a custom source function so adding the hacky
solution would not be too much effort.

Looking forward to the "proper" solution!

Niels

On Fri, Mar 9, 2018, 16:00 Piotr Nowojski <pi...@data-artisans.com> wrote:

> Hi,
>
> Short answer is: no, at the moment clean shutdown is not implemented for
> the streaming, but it’s on our to do list for the future.
>
> Hacky answer: you could implement some custom code, that would wait for at
> least one completed checkpoint after the last input data. But that would
> require modifying a source function or at least wrapping it and there might
> be some corner cases that I haven’t thought about.
>
> Piotrek
>
>
> On 9 Mar 2018, at 14:49, Niels van Kaam <ni...@vankaam.net> wrote:
>
> Hi,
>
> I'm working on a custom implementation of a sink which I would like to use
> with exactly once semantics. Therefore I have implemented the
> TwoPhaseCommitSinkFunction class as mentioned in this recent post:
> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
>
> I have some integration tests which run jobs using the custom sink with a
> finite dataset (A RichSourceFunction with a "finite" run method). The tests
> fail because of missing data. I noticed that is due to the last transaction
> being aborted.
>
> When looking into the source code that makes sense because the close()
> implementation of TwoPhaseCommitSinkFunction calls abort on the current
> transaction:
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
>
>
> I could override this behaviour and perform a commit, but then I would
> perform a commit without getting the checkpoint completed notification,
> thus not properly maintaining exactly once guarantees
>
> Is (and how is) it possible to have end-to-end exactly once guarantees
> when dealing with (sometimes) finite jobs?
>
> Thanks!
> Niels
>
>
>


"Close()" aborts last transaction in TwoPhaseCommitSinkFunction

2018-03-09 Thread Niels van Kaam
Hi,

I'm working on a custom implementation of a sink which I would like to use
with exactly once semantics. Therefore I have implemented the
TwoPhaseCommitSinkFunction class as mentioned in this recent post:
https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html

I have some integration tests which run jobs using the custom sink with a
finite dataset (A RichSourceFunction with a "finite" run method). The tests
fail because of missing data. I noticed that is due to the last transaction
being aborted.

When looking into the source code that makes sense because the close()
implementation of TwoPhaseCommitSinkFunction calls abort on the current
transaction:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java


I could override this behaviour and perform a commit, but then I would
perform a commit without getting the checkpoint completed notification,
thus not properly maintaining exactly once guarantees

Is (and how is) it possible to have end-to-end exactly once guarantees when
dealing with (sometimes) finite jobs?

Thanks!
Niels