Providing files while application mode deployment

2021-11-09 Thread Vasily Melnik
Hi all.

While running Flink jobs in application mode on YARN and Kuber, we need to
provide some configuration files to main class. Is there any option on
Flink CLI  to copy local files on cluster without manually copying on DFS
or in docker image, something like *--files* option in spark-submit?


Reuse in Blink execution plan

2021-09-02 Thread Vasily Melnik
Hi all.

Using SQL with blink planner for batch calculations, i see *Reused*  nodes
in Optimized Execution Plan while making self join operations:


== Optimized Execution Plan ==
Union(all=[true], union=[id, v, v0, w0$o0])
:- OverAggregate(orderBy=[id DESC], window#0=[ROW_NUMBER(*) AS w0$o0 ROWS
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[id, v, v0,
w0$o0])(reuse_id=[2])
:  +- Sort(orderBy=[id DESC])
: +- Exchange(distribution=[single])
:+- Calc(select=[id, v, v0])
:   +- HashJoin(joinType=[LeftOuterJoin], where=[($f2 = id0)],
select=[id, v, $f2, id0, v0], build=[right])
:  :- Exchange(distribution=[hash[$f2]])
:  :  +- Calc(select=[id, v, (id + 1) AS $f2])
:  : +- TableSourceScan(table=[[default_catalog,
default_database, t1]], fields=[id, v])(reuse_id=[1])
:  +- Exchange(distribution=[hash[id]])
: +- *Reused*(reference_id=[1])
+- *Reused*(reference_id=[2])


Question is: do these steps (scans, intermediate calculations) really be
calculated once or it is just a print shortcut?


Re: State expiration in Flink

2020-06-01 Thread Vasily Melnik
Thanks, Yun!

One more question: is it possible to create some kind of handler on
clearing up the state? For example i want to flush state to external
storage (e.g. HBase) before cleanup.Now we make this manually with onTimer
method, but is there another way?


On Mon, 1 Jun 2020 at 05:28, Yun Tang  wrote:

> Hi Vasily
>
> After Flink-1.10, state will be cleaned up periodically as
> CleanupInBackground is enabled by default. Thus, even you never access
> some specific entry of state and that entry could still be cleaned up.
>
> Best
> Yun Tang
> ----------
> *From:* Vasily Melnik 
> *Sent:* Saturday, May 30, 2020 23:29
> *To:* user 
> *Subject:* State expiration in Flink
>
> Hi .
> I'm a bit confused with this point in State TTL documentation:
> " By default, expired values are explicitly removed on read, such as
> ValueState#value, and periodically garbage collected in the background if
> supported by the configured state backend. "
> Does it mean, that  if i have only one event with specific key, it's state
> will never be cleaned on TTL expiration cause of i will never call value
> method for this key again?
>
>
>


State expiration in Flink

2020-05-30 Thread Vasily Melnik
Hi .
I'm a bit confused with this point in State TTL documentation:
" By default, expired values are explicitly removed on read, such as
ValueState#value, and periodically garbage collected in the background if
supported by the configured state backend. "
Does it mean, that  if i have only one event with specific key, it's state
will never be cleaned on TTL expiration cause of i will never call value
method for this key again?


Re: REST rescale with Flink on YARN

2020-01-23 Thread Vasily Melnik
Hi all,
I've found some solution for this issue.
Problem is that with YARN ApplicationMaster URL we communicate with
JobManager via proxy which is implemented on Jetty 6 (for Hadoop 2.6).
So to use PATCH method we need to locate original JobManager URL.
Using /jobmanager/config API we could get only host, but web.port is
displayed as 0 (???)
To find actual web port, we should parse YARN logs for jobmanager, where we
can find something like this:

*INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint - Rest
endpoint listening at :.*

Maybe  someone knows less complicated way to find actual REST URL under
YARN?




С уважением,
Василий Мельник


On Thu, 23 Jan 2020 at 15:32, Chesnay Schepler  wrote:

> Older versions of Jetty don't support PATCH requests. You will either have
> to update it or create a custom Flink version that uses POST for the
> rescale operation.
>
> On 23/01/2020 13:23, Vasily Melnik wrote:
>
> Hi all.
> I'm using Flink 1.8 on YARN with CDH 5.12
> When i try to perform rescale request:
>
> curl -v -X PATCH 
> '/proxy/application_1576854986116_0079/jobs/11dcfc3163936fc019e049fc841b075b/rescaling?parallelism=3
>  
> <https://slack-redir.net/link?url=http%3A%2F%2Fstrm-flink-tm-dev-1.dwh.m1.tinkoff.cloud%3A8088%2Fproxy%2Fapplication_1576854986116_0079%2Fjobs%2F11dcfc3163936fc019e049fc841b075b%2Frescaling%3Fparallelism%3D3>'
>
> i get a mistake:
>
> *Method PATCH is not defined in RFC 2068 and is not supported by the
> Servlet API *GET and POST methods work well.
> The Server type in response is Jetty(6.1.26.cloudera.4).
>
> How can i deal with this situation?
>
> С уважением,
> Василий Мельник
>
>
>


REST rescale with Flink on YARN

2020-01-23 Thread Vasily Melnik
Hi all.
I'm using Flink 1.8 on YARN with CDH 5.12
When i try to perform rescale request:

curl -v -X PATCH
'/proxy/application_1576854986116_0079/jobs/11dcfc3163936fc019e049fc841b075b/rescaling?parallelism=3
'

i get a mistake:

*Method PATCH is not defined in RFC 2068 and is not supported by the
Servlet API*GET and POST methods work well.
The Server type in response is Jetty(6.1.26.cloudera.4).

How can i deal with this situation?

С уважением,
Василий Мельник


Re: State name uniqueness

2020-01-20 Thread Vasily Melnik
Gordon, great thanks!

С уважением,
Василий Мельник


On Mon, 20 Jan 2020 at 13:56, Tzu-Li (Gordon) Tai 
wrote:

> Hi Vasily,
>
> State names need to be unique within operators only.
>
> Cheers,
> Gordon
>
> On Mon, Jan 20, 2020 at 10:58 AM Vasily Melnik <
> vasily.mel...@glowbyteconsulting.com> wrote:
>
>> Hi all,
>>
>> I'm a bit confused with state name uniqueness.
>> Should it be unique within operator only, or within entire job?
>>
>> С уважением,
>> Василий Мельник
>>
>


State name uniqueness

2020-01-20 Thread Vasily Melnik
Hi all,

I'm a bit confused with state name uniqueness.
Should it be unique within operator only, or within entire job?

С уважением,
Василий Мельник


Re: Broadcast checkpoint serialization fail

2019-11-18 Thread Vasily Melnik
Hi all,
We found the solution:
the problem is Comparator in TreeSet we used as the value of broadcast
state. Kryo is unable to serialize lambda in Comparator, so we changed to
regular class - and everything is fine now.


С уважением,
Василий Мельник

*Glow**Byte Consulting* <http://www.gbconsulting.ru/>

===

Моб. тел.: +7 (903) 101-43-71
vasily.mel...@glowbyteconsulting.com


On Fri, 15 Nov 2019 at 14:29, Vasily Melnik <
vasily.mel...@glowbyteconsulting.com> wrote:

> Hi all.
> In Flink 1.8 we have strange exception that causes job failing:
>
> 2019-11-14 15:52:52,071 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph- op4 (1/1)
> (797d4c2b85010dab6be5e1d06ff6493a) switched from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 2 for operator op4 (1/1).}
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
> operator op4 (1/1).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.NullPointerException
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> ... 5 more
> Caused by: java.lang.NullPointerException
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80)
> at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:593)
> at
> com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeSetSerializer.write(DefaultSerializers.java:608)
> at
> com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeSetSerializer.write(DefaultSerializers.java:605)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
> at
> org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
> ... 7 more
>
> As we see, exception occurs in  
> *org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109)
> *
> but what exactly is the reason?
>
> We configured RocksDB state backend for job with local filesystem storage.
>
>
> С уважением,
> Василий Мельник
>


Broadcast checkpoint serialization fail

2019-11-15 Thread Vasily Melnik
Hi all.
In Flink 1.8 we have strange exception that causes job failing:

2019-11-14 15:52:52,071 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph- op4 (1/1)
(797d4c2b85010dab6be5e1d06ff6493a) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint
2 for operator op4 (1/1).}
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
operator op4 (1/1).
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.NullPointerException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.lang.NullPointerException
at
com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:593)
at
com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeSetSerializer.write(DefaultSerializers.java:608)
at
com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeSetSerializer.write(DefaultSerializers.java:605)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
at
org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
at
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
... 7 more

As we see, exception occurs in
*org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109)
*
but what exactly is the reason?

We configured RocksDB state backend for job with local filesystem storage.


С уважением,
Василий Мельник


Re: Initialization of broadcast state before processing main stream

2019-11-15 Thread Vasily Melnik
Maxim, great thanks.
We'll try buffering.

С уважением,
Василий Мельник


On Thu, 14 Nov 2019 at 19:36, Maxim Parkachov  wrote:

> Hi Vasily,
>
> unfortunately, this is known issue with Flink, you could read discussion
> under
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
>  .
>
> At the moment I have seen 3 solutions for this issue:
>
> 1. You buffer fact stream in local state before broadcast is completely
> read
> 2. You create custom source for fact stream and in open method wait before
> broadcast stream is completely read.
> 3. With latest Flink version, you could pre-populate state with dimension
> and start Flink job with existing state. You need to take care of setting
> correct kafka offsets for dimension stream though, otherwise you will get a
> gap between pre-populated state and moment when job is started.
>
> First 2  solutions need to know when broadcast stream is "completely
> read". I created workaround for this issue with custom source for dimension
> events. It creates "stop file" on shared file system, reads with admin
> interface kafka end offsets for dimension topic, start processing all
> messages from beginning and clears "stop file" after offset of messages
> reached end offsets for all partitions. Instead of "stop file" you could
> use shared lock in zookeeper.
>
> Hope this helps,
> Maxim.
>
> On Thu, Nov 14, 2019 at 7:42 AM vino yang  wrote:
>
>> Hi Vasily,
>>
>> Currently, Flink did not do the coordination between a general stream and
>> broadcast stream, they are both streams. Your scene of using the broadcast
>> state is a special one. In a more general scene, the states need to be
>> broadcasted is an unbounded stream, the state events may be broadcasted to
>> the downstream at any time. So it can not be wait to be done before playing
>> the usual stream events.
>>
>> For your scene:
>>
>>
>>- you can change your storage about dimension table, e.g. Redis or
>>MySQL and so on to do the stream and dimension table join;
>>- you can inject some control event in your broadcast stream to mark
>>the stream is end and let the fact stream wait until receiving the control
>>event. Or you can introduce a thrid-party coordinator e.g. ZooKeeper to
>>coordinate them, however, it would make your solution more complex.
>>
>> Best,
>> Vino
>>
>>
>> Vasily Melnik  于2019年11月14日周四
>> 下午1:28写道:
>>
>>> Hi all.
>>>
>>> In our task we have two Kafka topics:
>>> - one with fact stream (web traffic)
>>> - one with dimension
>>>
>>> We would like to put dimension data into broadcast state and lookup on
>>> int with facts. But we see that not all dimension records are put into
>>> state before first fact record is processed, so lookup gives no data.
>>>
>>> The question is: how could we read fact topic with some "delay" to give
>>> dimension enough time to initialize state?
>>>
>>>
>>> С уважением,
>>> Василий Мельник
>>>
>>


Initialization of broadcast state before processing main stream

2019-11-13 Thread Vasily Melnik
Hi all.

In our task we have two Kafka topics:
- one with fact stream (web traffic)
- one with dimension

We would like to put dimension data into broadcast state and lookup on int
with facts. But we see that not all dimension records are put into state
before first fact record is processed, so lookup gives no data.

The question is: how could we read fact topic with some "delay" to give
dimension enough time to initialize state?


С уважением,
Василий Мельник


Set flink-conf parameters dynamicly

2019-09-19 Thread Vasily Melnik
Hi, all
I wonder is it possible to pass custom flink-conf file as run util
parameter or change config parameters in Java code manually?
I'd like to change metrics.scope parameter for each job independently.

С уважением,
Василий Мельник


Re: Flink metrics scope for YARN single job

2019-08-15 Thread Vasily Melnik
Hi Biao!

>  Do you mean "distinguish metrics from different JobManager running on
same host"?
Exactly.

>Will give you a feedback if there is a conclusion.
Thanks!



On Thu, 15 Aug 2019 at 06:40, Biao Liu  wrote:

> Hi Vasily,
>
> > Is there any way to distinguish logs from different JobManager running
> on same host?
>
> Do you mean "distinguish metrics from different JobManager running on
> same host"?
> I guess there is no other variable you could use for now.
>
> But I think it's reasonable to support this requirement. I would like to
> discuss with the devs to hear their opinions. Will give you a feedback if
> there is a conclusion.
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Wed, 14 Aug 2019 at 19:46, Vasily Melnik <
> vasily.mel...@glowbyteconsulting.com> wrote:
>
>> Hi,
>> I want to run Flink apps on YARN in single job mode and keep metrics in
>> Graphite. But as i see, the only variable i can use for JobManager scope
>> customization is :
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#list-of-all-variables
>>
>> Is there any way to distinguish logs from different JobManager running on
>> same host?
>>
>>
>> Thanks in advance.
>>
>


Flink metrics scope for YARN single job

2019-08-14 Thread Vasily Melnik
Hi,
I want to run Flink apps on YARN in single job mode and keep metrics in
Graphite. But as i see, the only variable i can use for JobManager scope
customization is :
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#list-of-all-variables

Is there any way to distinguish logs from different JobManager running on
same host?


Thanks in advance.


Re: Exactly-once semantics in Flink Kafka Producer

2019-08-02 Thread Vasily Melnik
Hi, Maxim.
My console-consumer command:
kafka-console-consumer --zookeeper ... --topic test --from-beginning
--isolation-level read_committed
It works perfectly well with manually written kafka producer - it reads
data only after commitTransaction.

On Fri, 2 Aug 2019 at 14:19, Maxim Parkachov  wrote:

> Hi Vasily,
>
> as far as I know, by default console-consumer reads uncommited.
> Try setting  isolation.level to read_committed in console-consumer
> properties.
>
> Hope this helps,
> Maxim.
>
> On Fri, Aug 2, 2019 at 12:42 PM Vasily Melnik <
> vasily.mel...@glowbyteconsulting.com> wrote:
>
>> Hi, Eduardo.
>> Maybe i should describe experiment design  precisely :
>> 1) I run Flink on YARN (YARN Session method).
>> 2) I do not stop/cancell application, i just kill TaskManager process
>> 3) After that YARN creates another TaskManager Process and auto
>> checkpoint restore from HDFS happens.
>>
>> That's why i expect to see correct restore.
>>
>> С уважением,
>> Василий Мельник
>>
>> *Glow**Byte Consulting* <http://www.gbconsulting.ru/>
>>
>> ===
>>
>> Моб. тел.: +7 (903) 101-43-71
>> vasily.mel...@glowbyteconsulting.com
>>
>>
>> On Fri, 2 Aug 2019 at 13:04, Eduardo Winpenny Tejedor <
>> eduardo.winpe...@gmail.com> wrote:
>>
>>> Hi Vasily,
>>>
>>> You're probably executing this from your IDE or from a local Flink
>>> cluster without starting your job from a checkpoint.
>>>
>>> When you start your Flink job for the second time you need to specify
>>> the path to the latest checkpoint as an argument, otherwise Flink will
>>> start from scratch.
>>>
>>> You're probably thinking that's not great, ideally Flink should be able
>>> to automatically continue from the last produced checkpoint, and actually
>>> that's what the docs say! Well, that's only when you're running in a proper
>>> cluster environment. Flink is able to recover using checkpoints when only
>>> part of the cluster fails, not when the whole job is stopped. For full
>>> stops you need to specify the checkpoint manually.
>>>
>>> Hope that helps!
>>>
>>>
>>> On Fri, 2 Aug 2019, 10:05 Vasily Melnik, <
>>> vasily.mel...@glowbyteconsulting.com> wrote:
>>>
>>>> I,m trying to test Flink 1.8.0 exactly-once semantics with Kafka Source
>>>> and Sink:
>>>>
>>>>1. Run flink app, simply transferring messages from one topic to
>>>>another with parallelism=1, checkpoint interval 20 seconds
>>>>2. Generate messages with incrementing integer numbers using Python
>>>>script each 2 seconds.
>>>>3. Read output topic with console consumer in read_committed
>>>>isolation level.
>>>>4. Manually kill TaskManager
>>>>
>>>> I expect to see monotonically increasing integers in output topic
>>>> regardless TaskManager killing and recovery.
>>>>
>>>> But actually a see something unexpected in console-consumer output:
>>>>
>>>> 32
>>>> 33
>>>> 34
>>>> 35
>>>> 36
>>>> 37
>>>> 38
>>>> 39
>>>> 40
>>>> -- TaskManager Killed
>>>> 32
>>>> 34
>>>> 35
>>>> 36
>>>> 40
>>>> 41
>>>> 46
>>>> 31
>>>> 33
>>>> 37
>>>> 38
>>>> 39
>>>> 42
>>>> 43
>>>> 44
>>>> 45
>>>>
>>>> Looks like all messages between checkpoints where replayed in output
>>>> topic. Also i expected to see results in output topic only after
>>>> checkpointing i.e. each 20 seconds, but messages appeared in output
>>>> immediately as they where send to input.
>>>> Is it supposed to be correct behaviour or i do something wrong?
>>>>
>>>> Kafka version 1.0.1 from Cloudera parcels. I tested Kafka transactional
>>>> producer and read-committed console comsumer with custom code and it worked
>>>> perfectly well reading messages only after commitTransaction on producer.
>>>>
>>>> My Flink code:
>>>>
>>>> StreamExecutionEnvironment env = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>> env.getConfig(

Re: Exactly-once semantics in Flink Kafka Producer

2019-08-02 Thread Vasily Melnik
Hi, Eduardo.
Maybe i should describe experiment design  precisely :
1) I run Flink on YARN (YARN Session method).
2) I do not stop/cancell application, i just kill TaskManager process
3) After that YARN creates another TaskManager Process and auto checkpoint
restore from HDFS happens.

That's why i expect to see correct restore.

С уважением,
Василий Мельник

*Glow**Byte Consulting* <http://www.gbconsulting.ru/>

===

Моб. тел.: +7 (903) 101-43-71
vasily.mel...@glowbyteconsulting.com


On Fri, 2 Aug 2019 at 13:04, Eduardo Winpenny Tejedor <
eduardo.winpe...@gmail.com> wrote:

> Hi Vasily,
>
> You're probably executing this from your IDE or from a local Flink cluster
> without starting your job from a checkpoint.
>
> When you start your Flink job for the second time you need to specify the
> path to the latest checkpoint as an argument, otherwise Flink will start
> from scratch.
>
> You're probably thinking that's not great, ideally Flink should be able to
> automatically continue from the last produced checkpoint, and actually
> that's what the docs say! Well, that's only when you're running in a proper
> cluster environment. Flink is able to recover using checkpoints when only
> part of the cluster fails, not when the whole job is stopped. For full
> stops you need to specify the checkpoint manually.
>
> Hope that helps!
>
>
> On Fri, 2 Aug 2019, 10:05 Vasily Melnik, <
> vasily.mel...@glowbyteconsulting.com> wrote:
>
>> I,m trying to test Flink 1.8.0 exactly-once semantics with Kafka Source
>> and Sink:
>>
>>1. Run flink app, simply transferring messages from one topic to
>>another with parallelism=1, checkpoint interval 20 seconds
>>2. Generate messages with incrementing integer numbers using Python
>>script each 2 seconds.
>>3. Read output topic with console consumer in read_committed
>>isolation level.
>>4. Manually kill TaskManager
>>
>> I expect to see monotonically increasing integers in output topic
>> regardless TaskManager killing and recovery.
>>
>> But actually a see something unexpected in console-consumer output:
>>
>> 32
>> 33
>> 34
>> 35
>> 36
>> 37
>> 38
>> 39
>> 40
>> -- TaskManager Killed
>> 32
>> 34
>> 35
>> 36
>> 40
>> 41
>> 46
>> 31
>> 33
>> 37
>> 38
>> 39
>> 42
>> 43
>> 44
>> 45
>>
>> Looks like all messages between checkpoints where replayed in output
>> topic. Also i expected to see results in output topic only after
>> checkpointing i.e. each 20 seconds, but messages appeared in output
>> immediately as they where send to input.
>> Is it supposed to be correct behaviour or i do something wrong?
>>
>> Kafka version 1.0.1 from Cloudera parcels. I tested Kafka transactional
>> producer and read-committed console comsumer with custom code and it worked
>> perfectly well reading messages only after commitTransaction on producer.
>>
>> My Flink code:
>>
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> env.getConfig().setAutoWatermarkInterval(1000);
>> env.enableCheckpointing(2, CheckpointingMode.EXACTLY_ONCE);
>> env.setStateBackend(new 
>> RocksDBStateBackend("hdfs:///checkpoints-data"));
>>
>> Properties producerProperty = new Properties();
>> producerProperty.setProperty("bootstrap.servers", ...);
>> producerProperty.setProperty("zookeeper.connect", ...);
>> 
>> producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"1");
>> 
>> producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction");
>> 
>> producerProperty.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
>> "true");
>>
>> Properties consumerProperty = new Properties();
>> consumerProperty.setProperty("bootstrap.servers", ...);
>> consumerProperty.setProperty("zookeeper.connect", ...);
>> consumerProperty.setProperty("group.id", "test2");
>>
>> FlinkKafkaConsumer consumer1 = new 
>> FlinkKafkaConsumer("stringTopic1", new ComplexStringSchema(), 
>> consumerProperty);
>> consumer1.assignTimestampsAndWatermarks(new PeriodicAssigner());
>>
>> FlinkKafkaProducer producer1 = new 
>> FlinkKafkaProducer("test",  new KeyedSerializationSchemaWrapper(new 
>> SimpleStringSchema()), producerProperty, 
>> FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>> producer1.ignoreFailuresAfterTransactionTimeout();
>> DataStreamSource s1 = env.addSource(consumer1);
>> s1.addSink(producer1);
>> env.execute("Test");
>>
>>
>>


Exactly-once semantics in Flink Kafka Producer

2019-08-02 Thread Vasily Melnik
I,m trying to test Flink 1.8.0 exactly-once semantics with Kafka Source and
Sink:

   1. Run flink app, simply transferring messages from one topic to another
   with parallelism=1, checkpoint interval 20 seconds
   2. Generate messages with incrementing integer numbers using Python
   script each 2 seconds.
   3. Read output topic with console consumer in read_committed isolation
   level.
   4. Manually kill TaskManager

I expect to see monotonically increasing integers in output topic
regardless TaskManager killing and recovery.

But actually a see something unexpected in console-consumer output:

32
33
34
35
36
37
38
39
40
-- TaskManager Killed
32
34
35
36
40
41
46
31
33
37
38
39
42
43
44
45

Looks like all messages between checkpoints where replayed in output topic.
Also i expected to see results in output topic only after checkpointing
i.e. each 20 seconds, but messages appeared in output immediately as they
where send to input.
Is it supposed to be correct behaviour or i do something wrong?

Kafka version 1.0.1 from Cloudera parcels. I tested Kafka transactional
producer and read-committed console comsumer with custom code and it worked
perfectly well reading messages only after commitTransaction on producer.

My Flink code:

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
env.enableCheckpointing(2, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new
RocksDBStateBackend("hdfs:///checkpoints-data"));

Properties producerProperty = new Properties();
producerProperty.setProperty("bootstrap.servers", ...);
producerProperty.setProperty("zookeeper.connect", ...);

producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"1");

producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction");
producerProperty.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
"true");

Properties consumerProperty = new Properties();
consumerProperty.setProperty("bootstrap.servers", ...);
consumerProperty.setProperty("zookeeper.connect", ...);
consumerProperty.setProperty("group.id", "test2");

FlinkKafkaConsumer consumer1 = new
FlinkKafkaConsumer("stringTopic1", new ComplexStringSchema(),
consumerProperty);
consumer1.assignTimestampsAndWatermarks(new PeriodicAssigner());

FlinkKafkaProducer producer1 = new
FlinkKafkaProducer("test",  new
KeyedSerializationSchemaWrapper(new SimpleStringSchema()),
producerProperty, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
producer1.ignoreFailuresAfterTransactionTimeout();
DataStreamSource s1 = env.addSource(consumer1);
s1.addSink(producer1);
env.execute("Test");