event time and late events - documentation

2018-07-16 Thread Sofer, Tovi
Hi group,
Can someone please elaborate on the comment at the end of section "Debugging 
Windows & Event Time"?
Didn't understand it meaning.
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html
"Handling Event Time Stragglers
Approach 1: Watermark stays late (indicated completeness), windows fire early
Approach 2: Watermark heuristic with maximum lateness, windows accept late data"

Thanks,
Tovi


FW: high availability with automated disaster recovery using zookeeper

2018-07-16 Thread Sofer, Tovi
Thank you Scott,
Looks like a very elegant solution.

How did you manage high availability in single data center?

Thanks,
Tovi

From: Scott Kidder 
Sent: יום ו 13 יולי 2018 01:13
To: Sofer, Tovi [ICG-IT] 
Cc: user@flink.apache.org
Subject: Re: high availability with automated disaster recovery using zookeeper

I've used a multi-datacenter Consul cluster used to coordinate 
service-discovery. When a service starts up in the primary DC, it registers 
itself in Consul with a key that has a TTL that must be periodically renewed. 
If the service shuts down or terminates abruptly, the key expires and is 
removed from Consul. A standby service in another DC can be started 
automatically after detecting the absence of the key in Consul in the primary 
DC. This could lead to submitting a job to the standby Flink cluster from the 
most recent savepoint that was copied by the offline process you mentioned. It 
should be pretty easy to automate all of this. I would not recommend setting up 
a multi-datacenter Zookeeper cluster; in my experience, Consul is much easier 
to work with.

Best,

--
Scott Kidder


On Mon, Jul 9, 2018 at 4:48 AM Sofer, Tovi 
mailto:tovi.so...@citi.com>> wrote:
Hi all,

We are now examining how to achieve high availability for Flink, and to support 
also automatic recovery in disaster scenario- when all DC goes down.
We have DC1 which we usually want work to be done, and DC2 – which is more 
remote and we want work to go there only when DC1 is down.

We examined few options and would be glad to hear feedback a suggestion for 
another way to achieve this.

• Two zookeeper separate zookeeper and flink clusters on the two data 
centers.
Only the cluster on DC1 are running, and state is copied to DC2 in offline 
process.

To achieve automatic recovery we need to use some king of watch dog which will 
check DC1 availability , and if it is down will start DC2 (and same later if 
DC2 is down).

Is there recommended tool for this?

• Zookeeper “stretch cluster” cross data centers – with 2 nodes on DC1, 
2 nodes on DC2 and one observer node.

Also flink cluster jobmabnager1 on DC1 and jobmanager2 on DC2.

This way when DC1 is down, zookeeper will notice this automatically and will 
transfer work to jobmanager2 on DC2.

However we would like zookeeper leader, and flink jobmanager leader (primary 
one) to be from DC1 – unless it is down.

Is there a way to achieve this?

Thanks and regards,
[citi_logo_mail]
Tovi Sofer
Software Engineer
+972 (3) 7405756
[Mail_signature_blue]



RE: high availability with automated disaster recovery using zookeeper

2018-07-10 Thread Sofer, Tovi
To add one thing to Mesos question-
My assumption that  constraints on JobManager  can work, is based on the 
sentence from link bleow
“When running Flink with Marathon, the whole Flink cluster including the job 
manager will be run as Mesos tasks in the Mesos cluster.”
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/mesos.html

[Not sure this is accurate, since it seems to contradict the image in link below
https://mesosphere.com/blog/apache-flink-on-dcos-and-apache-mesos ]

From: Sofer, Tovi [ICG-IT]
Sent: יום ג 10 יולי 2018 20:04
To: 'Till Rohrmann' ; user 
Cc: Gardi, Hila [ICG-IT] 
Subject: RE: high availability with automated disaster recovery using zookeeper

Hi Till, group,

Thank you for your response.
After reading further online on Mesos – Can’t Mesos fill the requirement of 
running job manager in primary server?
By using: “constraints”: [[“datacenter”, “CLUSTER”, “main”]]
(See 
http://www.stratio.com/blog/mesos-multi-data-center-architecture-for-disaster-recovery/
 )

Is this supported by Flink cluster on Mesos ?

Thanks again
Tovi

From: Till Rohrmann mailto:trohrm...@apache.org>>
Sent: יום ג 10 יולי 2018 10:11
To: Sofer, Tovi [ICG-IT] 
mailto:ts72...@imceu.eu.ssmb.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: high availability with automated disaster recovery using zookeeper

Hi Tovi,

that is an interesting use case you are describing here. I think, however, it 
depends mainly on the capabilities of ZooKeeper to produce the intended 
behavior. Flink itself relies on ZooKeeper for leader election in HA mode but 
does not expose any means to influence the leader election process. To be more 
precise ZK is used as a blackbox which simply tells a JobManager that it is now 
the leader, independent of any data center preferences. I'm not sure whether it 
is possible to tell ZooKeeper about these preferences. If not, then an 
alternative could be to implement one's own high availability services which 
does that at the moment.

Cheers,
Till

On Mon, Jul 9, 2018 at 1:48 PM Sofer, Tovi 
mailto:tovi.so...@citi.com>> wrote:
Hi all,

We are now examining how to achieve high availability for Flink, and to support 
also automatic recovery in disaster scenario- when all DC goes down.
We have DC1 which we usually want work to be done, and DC2 – which is more 
remote and we want work to go there only when DC1 is down.

We examined few options and would be glad to hear feedback a suggestion for 
another way to achieve this.

• Two zookeeper separate zookeeper and flink clusters on the two data 
centers.
Only the cluster on DC1 are running, and state is copied to DC2 in offline 
process.

To achieve automatic recovery we need to use some king of watch dog which will 
check DC1 availability , and if it is down will start DC2 (and same later if 
DC2 is down).

Is there recommended tool for this?

• Zookeeper “stretch cluster” cross data centers – with 2 nodes on DC1, 
2 nodes on DC2 and one observer node.

Also flink cluster jobmabnager1 on DC1 and jobmanager2 on DC2.

This way when DC1 is down, zookeeper will notice this automatically and will 
transfer work to jobmanager2 on DC2.

However we would like zookeeper leader, and flink jobmanager leader (primary 
one) to be from DC1 – unless it is down.

Is there a way to achieve this?

Thanks and regards,
[citi_logo_mail]
Tovi Sofer
Software Engineer
+972 (3) 7405756
[Mail_signature_blue]



RE: high availability with automated disaster recovery using zookeeper

2018-07-10 Thread Sofer, Tovi
Hi Till, group,

Thank you for your response.
After reading further online on Mesos – Can’t Mesos fill the requirement of 
running job manager in primary server?
By using: “constraints”: [[“datacenter”, “CLUSTER”, “main”]]
(See 
http://www.stratio.com/blog/mesos-multi-data-center-architecture-for-disaster-recovery/
 )

Is this supported by Flink cluster on Mesos ?

Thanks again
Tovi

From: Till Rohrmann 
Sent: יום ג 10 יולי 2018 10:11
To: Sofer, Tovi [ICG-IT] 
Cc: user 
Subject: Re: high availability with automated disaster recovery using zookeeper

Hi Tovi,

that is an interesting use case you are describing here. I think, however, it 
depends mainly on the capabilities of ZooKeeper to produce the intended 
behavior. Flink itself relies on ZooKeeper for leader election in HA mode but 
does not expose any means to influence the leader election process. To be more 
precise ZK is used as a blackbox which simply tells a JobManager that it is now 
the leader, independent of any data center preferences. I'm not sure whether it 
is possible to tell ZooKeeper about these preferences. If not, then an 
alternative could be to implement one's own high availability services which 
does that at the moment.

Cheers,
Till

On Mon, Jul 9, 2018 at 1:48 PM Sofer, Tovi 
mailto:tovi.so...@citi.com>> wrote:
Hi all,

We are now examining how to achieve high availability for Flink, and to support 
also automatic recovery in disaster scenario- when all DC goes down.
We have DC1 which we usually want work to be done, and DC2 – which is more 
remote and we want work to go there only when DC1 is down.

We examined few options and would be glad to hear feedback a suggestion for 
another way to achieve this.

• Two zookeeper separate zookeeper and flink clusters on the two data 
centers.
Only the cluster on DC1 are running, and state is copied to DC2 in offline 
process.

To achieve automatic recovery we need to use some king of watch dog which will 
check DC1 availability , and if it is down will start DC2 (and same later if 
DC2 is down).

Is there recommended tool for this?

• Zookeeper “stretch cluster” cross data centers – with 2 nodes on DC1, 
2 nodes on DC2 and one observer node.

Also flink cluster jobmabnager1 on DC1 and jobmanager2 on DC2.

This way when DC1 is down, zookeeper will notice this automatically and will 
transfer work to jobmanager2 on DC2.

However we would like zookeeper leader, and flink jobmanager leader (primary 
one) to be from DC1 – unless it is down.

Is there a way to achieve this?

Thanks and regards,
[citi_logo_mail]
Tovi Sofer
Software Engineer
+972 (3) 7405756
[Mail_signature_blue]



high availability with automated disaster recovery using zookeeper

2018-07-09 Thread Sofer, Tovi
Hi all,

We are now examining how to achieve high availability for Flink, and to support 
also automatic recovery in disaster scenario- when all DC goes down.
We have DC1 which we usually want work to be done, and DC2 - which is more 
remote and we want work to go there only when DC1 is down.

We examined few options and would be glad to hear feedback a suggestion for 
another way to achieve this.

* Two zookeeper separate zookeeper and flink clusters on the two data 
centers.
Only the cluster on DC1 are running, and state is copied to DC2 in offline 
process.

To achieve automatic recovery we need to use some king of watch dog which will 
check DC1 availability , and if it is down will start DC2 (and same later if 
DC2 is down).

Is there recommended tool for this?

* Zookeeper "stretch cluster" cross data centers - with 2 nodes on DC1, 
2 nodes on DC2 and one observer node.

Also flink cluster jobmabnager1 on DC1 and jobmanager2 on DC2.

This way when DC1 is down, zookeeper will notice this automatically and will 
transfer work to jobmanager2 on DC2.

However we would like zookeeper leader, and flink jobmanager leader (primary 
one) to be from DC1 - unless it is down.

Is there a way to achieve this?

Thanks and regards,
[citi_logo_mail]
Tovi Sofer
Software Engineer
+972 (3) 7405756
[Mail_signature_blue]



RE: kafka as recovery only source

2018-02-07 Thread Sofer, Tovi
Hi Fabian,

Thank you for the suggestion. We will consider it.
Would be glad to hear other ideas how to handle such requirement.

Thanks again,
Tovi
From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: יום ד 07 פברואר 2018 11:47
To: Sofer, Tovi [ICG-IT] <ts72...@imceu.eu.ssmb.com>
Cc: user@flink.apache.org; Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Subject: Re: kafka as recovery only source

Hi Tovi,
I've been thinking about this idea.
It might be possible, but I think you have to implement a custom source for 
this.

I don't think it would work to have the JMSConsumer, KafkaSink, and 
RecoverySource in separate operators because otherwise it would not be possible 
to share the Kafka write offset of the recovery topic at checkpoints.
QueryableState as you suggested only works for keyed state which is (AFAIK) not 
available for sources.
The custom source operator would consume from JMS and directly write all 
records to Kafka. In case of a recovery, it starts reading from Kafka and 
continues with JMS once the recovery topic has been completely consumed.
If you run the source in parallel, you need to handle the partitions of Kafka 
recovery topic.

I'm adding Gordon to this thread who might have additional comments or ideas.
Best, Fabian


2018-02-06 15:31 GMT+01:00 Sofer, Tovi 
<tovi.so...@citi.com<mailto:tovi.so...@citi.com>>:
Hi group,

I wanted to get your suggestion on how to implement two requirements we have:

• One is to read from external message queue (JMS) at very fast latency

• Second is to support zero data loss, so that in case of restart and 
recovery, messages not checkpointed (and not part of state) will be replayed 
again.

(which indicates kind of replayble source)

Because of the first requirement we can’t write JMS messages to Kafka first and 
only then read from kafka, because it will increase latency.
Instead we thought to consume the JMS messages and forward them both to job and 
to KafkaSink.
Then in case of failure and recovery, we want to start in recovery mode, and 
read message from offset matching the state\checkpoint.
How can this be done? We though to somehow save in the state the last flushed 
kakfa offset.
The problem is this information is available only via future\interceptor and we 
don’t know how to connect it to state, so RecoverySource can use it…

So current suggestion looks something like:

Happy path:
JMSQueue-> JMSConsumer -> JobMessageParser(and additional operators), KafkaSink
(Here maybe we can add ProducerInterceptor-> which saves offset to state 
somehow)

Failure path: (will run before HappyPath to recover data)
RecoverySource-> JobMessageParser(and additional operators)
(Here maybe add Queryable state client which reads offsets from other operator 
state)

Thanks,
Tovi




kafka as recovery only source

2018-02-06 Thread Sofer, Tovi
Hi group,

I wanted to get your suggestion on how to implement two requirements we have:

* One is to read from external message queue (JMS) at very fast latency

* Second is to support zero data loss, so that in case of restart and 
recovery, messages not checkpointed (and not part of state) will be replayed 
again.

(which indicates kind of replayble source)

Because of the first requirement we can't write JMS messages to Kafka first and 
only then read from kafka, because it will increase latency.
Instead we thought to consume the JMS messages and forward them both to job and 
to KafkaSink.
Then in case of failure and recovery, we want to start in recovery mode, and 
read message from offset matching the state\checkpoint.
How can this be done? We though to somehow save in the state the last flushed 
kakfa offset.
The problem is this information is available only via future\interceptor and we 
don't know how to connect it to state, so RecoverySource can use it...

So current suggestion looks something like:

Happy path:
JMSQueue-> JMSConsumer -> JobMessageParser(and additional operators), KafkaSink
(Here maybe we can add ProducerInterceptor-> which saves offset to state 
somehow)

Failure path: (will run before HappyPath to recover data)
RecoverySource-> JobMessageParser(and additional operators)
(Here maybe add Queryable state client which reads offsets from other operator 
state)

Thanks,
Tovi



RE: Sync and Async checkpoint time

2018-01-31 Thread Sofer, Tovi
Hi Stefan,

Thank you for the answer.
So you mean that any window use in the stream will result in synchronous 
snapshotting?
When are you planning to fix this?
And is there a workaround?

Thanks again,
Tovi
From: Stefan Richter [mailto:s.rich...@data-artisans.com]
Sent: יום ג 30 ינואר 2018 21:10
To: Sofer, Tovi [ICG-IT] <ts72...@imceu.eu.ssmb.com>
Cc: user@flink.apache.org
Subject: Re: Sync and Async checkpoint time

Hi,

this looks like the timer service is the culprit for this problem. Timers are 
currently not stored in the state backend, but in a separate on-heap data 
structure that does not support copy-on-write or async snapshots in general. 
Therefore, writing the timers for a snapshot is always synchronous and this 
explanation would also match your observation that the problem mainly affects 
window operators, which make heavy use of timers.

Best,
Stefan


Am 30.01.2018 um 18:17 schrieb Sofer, Tovi 
<tovi.so...@citi.com<mailto:tovi.so...@citi.com>>:

Hi group,

In our project we are using asynchronous  FSStateBackend, and we are trying to 
move to distributed storage – currently S3.
When using this storage we are experiencing issues of high backpressure and 
high latency, in comparison of local storage.
We are trying to understand the reason, since the checkpoint is asynchronous, 
so it shouldn’t have such high effect.

We looked at checkpoint history in web, and details from log.
• From web it seems that Sync checkpoint duration is much higher then 
Async duration. (again, this is only when using s3, not when using local 
storage)
This happens especially in window operators (tumbling windows) such as below.
• But from log Sync time seems very short…

Do you have any estimation why the async write to FSStateBackend has such high 
effect on the stream performance?

Checkpoint config:

env.enableCheckpointing(6);
env.setStateBackend(new FsStateBackend(checkpointDirURI, true));

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2);


• Checkpoint info from console:


• Checkpoint info from log:
2018-01-30 07:33:36,416 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-42-thread-1] 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-42-thread-1,5,Flink Task Threads] took 
12139 ms.
2018-01-30 07:33:36,418 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-83] Received acknowledge message for 
checkpoint 52 from task 19ae368e935f177b513577256505ff37 of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:36,676 INFO  
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-35-thread-1] 
Heap backend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-35-thread-1,5,Flink Task Threads] took 
12396 ms.
2018-01-30 07:33:36,677 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
checkpoint 52 from task 19bf0464a13f3c9bd559b7559d166de2 of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,347 INFO  
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-17-thread-1] 
Heap backend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-17-thread-1,5,Flink Task Threads] took 
13067 ms.
2018-01-30 07:33:37,349 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
checkpoint 52 from task 97e169eefde0c57f52a874dee5a0b5a2 of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,418 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-29-thread-1] 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-29-thread-1,5,Flink Task Threads] took 
13143 ms.
2018-01-30 07:33:37,420 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
checkpoint 52 from task 19b990204530184a97fbaad373dfaf11 of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,508 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-33-thread-1] 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-33-thread-1,5,Flink Task Threads] took 
13234 ms.
2018-01-30 07:33:37,509 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-79] Re

Sync and Async checkpoint time

2018-01-30 Thread Sofer, Tovi
Hi group,

In our project we are using asynchronous  FSStateBackend, and we are trying to 
move to distributed storage - currently S3.
When using this storage we are experiencing issues of high backpressure and 
high latency, in comparison of local storage.
We are trying to understand the reason, since the checkpoint is asynchronous, 
so it shouldn't have such high effect.

We looked at checkpoint history in web, and details from log.

* From web it seems that Sync checkpoint duration is much higher then 
Async duration. (again, this is only when using s3, not when using local 
storage)
This happens especially in window operators (tumbling windows) such as below.

* But from log Sync time seems very short...


Do you have any estimation why the async write to FSStateBackend has such high 
effect on the stream performance?

Checkpoint config:

env.enableCheckpointing(6);
env.setStateBackend(new FsStateBackend(checkpointDirURI, true));

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2);



* Checkpoint info from console:
[cid:image004.png@01D399E6.3A2F69F0]


* Checkpoint info from log:
2018-01-30 07:33:36,416 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-42-thread-1] 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-42-thread-1,5,Flink Task Threads] took 
12139 ms.
2018-01-30 07:33:36,418 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-83] Received acknowledge message for 
checkpoint 52 from task 19ae368e935f177b513577256505ff37 of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:36,676 INFO  
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-35-thread-1] 
Heap backend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-35-thread-1,5,Flink Task Threads] took 
12396 ms.
2018-01-30 07:33:36,677 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
checkpoint 52 from task 19bf0464a13f3c9bd559b7559d166de2 of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,347 INFO  
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-17-thread-1] 
Heap backend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-17-thread-1,5,Flink Task Threads] took 
13067 ms.
2018-01-30 07:33:37,349 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
checkpoint 52 from task 97e169eefde0c57f52a874dee5a0b5a2 of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,418 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-29-thread-1] 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-29-thread-1,5,Flink Task Threads] took 
13143 ms.
2018-01-30 07:33:37,420 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
checkpoint 52 from task 19b990204530184a97fbaad373dfaf11 of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,508 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-33-thread-1] 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-33-thread-1,5,Flink Task Threads] took 
13234 ms.
2018-01-30 07:33:37,509 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
[flink-akka.actor.default-dispatcher-79] Received acknowledge message for 
checkpoint 52 from task 5012eda50495ae33ca38e2478b3f8e0d of job 
747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,589 INFO  
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - 
[ParsedOrdersDelayWindow (2/4)] Heap backend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
synchronous part) in thread Thread[ParsedOrdersDelayWindow (2/4),5,Flink Task 
Threads] took 1 ms.
2018-01-30 07:33:37,678 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-49-thread-1] 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, 
asynchronous part) in thread Thread[pool-49-thread-1,5,Flink Task Threads] took 
13403 ms.
2018-01-30 07:33:37,680 DEBUG 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 

RE: Two operators consuming from same stream

2018-01-04 Thread Sofer, Tovi
Hi Timo,

Actually I do keyBy in both cases, and in split\duplicate case I do it on both 
splitted streams.

I did do the connect below twice and not once, but connect only calls ctor of 
ConnectedStreams, and doesn’t do any real operation.
So I don’t see how it will make a difference.
I can try it if you see a reason.


More detailed code including all keyBy:

Code without duplication looks something like:
KeyedStream orderKeyedStream = ordersStream.keyBy(field);
KeyedStream pricesKeyedStream = pricesStream.keyBy(field);

orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperA);
orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperB);


Code with duplication (better latency):
(We duplicate streams and then do connect of pricesStreamA with ordersStreamA, 
and pricesStreamA with ordersStreamB, and keyBy as part of connect, similar to 
above).
//duplicate prices streams
SplitStream pricesSplitStream = pricesStream
.split( price -> ImmutableList.of("pricesStreamA "," 
pricesStreamB ") );
 DataStream pricesStreamA = pricesSplitStream.select("pricesStreamA");
DataStream< Price > pricesStreamB= pricesSplitStream.select("pricesStreamB");
//duplicate orders streams
 SplitStream< Order > ordersSplitStream = ordersStream
.split( order -> ImmutableList.of("orderStreamA "," 
orderStreamB ") );
 DataStream orderStreamA = ordersSplitStream.select("orderStreamA ");
DataStream orderStreamB = ordersSplitStream.select("orderStreamB ");

DataStream priceOrdersConnectedStream = 
orderStreamA.connect(pricesStreamA).keyBy(“priceId”,“ priceId”)
.flatMap(mapperA);
DataStream orderPricesConnectedStream = 
orderStreamB.connect(pricesStreamB).keyBy(“orderId”,“ orderId”)
.flatMap(mapperB);



From: Timo Walther [mailto:twal...@apache.org]
Sent: יום ד 03 ינואר 2018 11:02
To: user@flink.apache.org
Subject: Re: Two operators consuming from same stream

Hi Tovi,

I think your code without duplication performs two separate shuffle operations 
whereas the other code only performs one shuffle.

Further latency impacts might be due to the overhead involved in maintaining 
the partitioning for a keyed stream/key groups and switching key contexts in 
the operator.

Did you check the latency of the following?

DataStream<> ds = 
orderKeyedStream.connect(pricesKeyedStream).flatMap(identityMapper);
ds.flatMap(mapperA);
ds.flatMap(mapperB);

Regards,
Timo


Am 1/1/18 um 2:50 PM schrieb Sofer, Tovi :
Hi group,

We have the following graph below, on which we added metrics for latency 
calculation.
We have two streams which are consumed by two operators:

· ordersStream and pricesStream – they are both consumed by two 
operators: CoMapperA and CoMapperB, each using connect.


Initially we thought that for stream consumed by two operators – that we need 
to duplicate the stream to two separate streams, so we did it using split as 
below.
Then we understood it is not a must , and two operators can consume same 
stream, so we removed the duplicate part.
However – when checking latency – we found that latency with duplicated streams 
was much better than without duplication (about twice).

My questions:

· Is the improved latency related to check pointing separately on those 
streams ?

· What is the cons of using the duplication if it has better latency? 
Are we harming the state correctness in any way?

Additional Info:
The two graphs configuration appear exactly the same in execution plan\web UI:



[sourceOrders.keyBy,CoMapperA,OrdersStreams]

[cid:image002.png@01D3857E.4C6223F0][cid:image003.png@01D3857E.4C6223F0][cid:image004.png@01D3857E.4C6223F0][cid:image005.png@01D3857E.4C6223F0]prsss






[cid:image006.png@01D3857E.4C6223F0]

[sourcePrices.keyBy,CoMapperB,pricesStreams]







Code without duplication looks something like:
KeyedStream orderKeyedStream = ordersStream.keyBy(field);
KeyedStream pricesKeyedStream = pricesStream.keyBy(field);

orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperA);
orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperB);


Code used for duplication:
(We duplicate streams and then do connect of pricesStreamA with ordersStreamA, 
and pricesStreamA with ordersStreamB, and keyBy as part of connect, similar to 
above).
//duplicate prices streams
SplitStream pricesStream = pricesStream
.split( price -> ImmutableList.of("pricesStreamA "," 
pricesStreamB ") );

DataStream pricesStreamA = pricesStreams.select("pricesStreamA");
DataStream< Price > pricesStreamB= 
pricesStreams.select("pricesStreamB");


Thanks,
Tovi









Two operators consuming from same stream

2018-01-01 Thread Sofer, Tovi
Hi group,

We have the following graph below, on which we added metrics for latency 
calculation.
We have two streams which are consumed by two operators:

* ordersStream and pricesStream - they are both consumed by two 
operators: CoMapperA and CoMapperB, each using connect.


Initially we thought that for stream consumed by two operators - that we need 
to duplicate the stream to two separate streams, so we did it using split as 
below.
Then we understood it is not a must , and two operators can consume same 
stream, so we removed the duplicate part.
However - when checking latency - we found that latency with duplicated streams 
was much better than without duplication (about twice).

My questions:

* Is the improved latency related to check pointing separately on those 
streams ?

* What is the cons of using the duplication if it has better latency? 
Are we harming the state correctness in any way?

Additional Info:
The two graphs configuration appear exactly the same in execution plan\web UI:


[sourceOrders.keyBy,CoMapperA,OrdersStreams]



[cid:image011.png@01D38317.DCA84CA0][cid:image012.png@01D38317.DCA84CA0][cid:image013.png@01D38317.DCA84CA0][cid:image022.png@01D38317.DCA84CA0]prsss

[cid:image027.png@01D38317.DCA84CA0]

[sourcePrices.keyBy,CoMapperB,pricesStreams]









Code without duplication looks something like:
KeyedStream orderKeyedStream = ordersStream.keyBy(field);
KeyedStream pricesKeyedStream = pricesStream.keyBy(field);

orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperA);
orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperB);


Code used for duplication:
(We duplicate streams and then do connect of pricesStreamA with ordersStreamA, 
and pricesStreamA with ordersStreamB, and keyBy as part of connect, similar to 
above).
//duplicate prices streams
SplitStream pricesStream = pricesStream
.split( price -> ImmutableList.of("pricesStreamA "," 
pricesStreamB ") );

DataStream pricesStreamA = pricesStreams.select("pricesStreamA");
DataStream< Price > pricesStreamB= 
pricesStreams.select("pricesStreamB");


Thanks,
Tovi







image001.emz
Description: image001.emz


image008.emz
Description: image008.emz


image009.emz
Description: image009.emz


image023.emz
Description: image023.emz


image024.emz
Description: image024.emz


image025.emz
Description: image025.emz


image026.emz
Description: image026.emz


RE: slot group indication per operator

2017-12-11 Thread Sofer, Tovi
Hi.

Any update or suggestion on this?

Best regards,
Tovi
From: Timo Walther [mailto:twal...@apache.org]
Sent: יום ג 05 דצמבר 2017 18:55
To: user@flink.apache.org
Cc: ches...@apache.org
Subject: Re: slot group indication per operator

Hi Tovi,

you are right, it is difficult to check the correct behavior.

@Chesnay: Do you know if we can get this information? If not through the Web 
UI, maybe via REST? Do we have access to the full ExecutionGraph somewhere?

Otherwise it might make sense to open an issue for this.

Regards,
Timo


Am 12/5/17 um 4:25 PM schrieb Sofer, Tovi :
Hi all,
I am trying to use the slot group feature, by having ‘default’ group and 
additional ‘market’ group.
The purpose is to divide the resources equally between two sources and their 
following operators.
I’ve set the slotGroup on the source of the market data.
Can I assume that all following operators created from this source will use 
same slot group of ‘market’?
(The operators created for market stream are pretty complex, with connect and 
split).
In Web UI I saw there are 16 slots, but didn’t see indication per operator to 
which group it was assigned. How can I know?

Relevant Code:

env.setParallelism(8);
conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 16); \\ to allow 
Parallelism of 8 per group

// Market source and operators:

KeyedStream<SpotTickEvent, Tuple> windowedStreamA = sourceProvider.provide(env)

.name(spotSourceProvider.getName())

.slotSharingGroup(SourceMsgType.MARKET.slotGroup())

.flatMap(new ParserMapper(new MarketMessageParser()))

.name(ParserMapper.class.getSimpleName())

.filter(new USDFilter())

.name(USDFilter.class.getSimpleName())

.keyBy(MarketEvent.CURRENCY_FIELD)

.timeWindow(Time.of(windowSizeMs, TimeUnit.MILLISECONDS))

.process(new LastInWindowPriceChangeFunction()))

.name(LastInWindowPriceChangeFunction.class.getSimpleName())

.keyBy(SpotTickEvent.CURRENCY_FIELD);


marketConnectedStream = windowedStreamA.connect(windowedStreamB)

.flatMap(new MarketCoMapper()))

.name(MarketCoMapper.class.getSimpleName())



SplitStream stocksWithSpotsStreams = marketConnectedStream

.split( market -> ImmutableList.of("splitA"," splitB") );



DataStream< MarketAWithMarketB> splitA = stocksWithSpotsStreams.select("splitA 
");


Thanks and regards,
Tovi






RE: Testing CoFlatMap correctness

2017-12-10 Thread Sofer, Tovi
Hi Kostas,

Thank you for the suggestion.
But in our case we want to do either a component test that involves several 
steps, where the CoFlatMap is one step in the middle, or integration test that 
test the whole flow, which involves also the CoFlatMap.
And we trying to understand how to test such scenario so that results are 
predictable, and that elements from main stream arrive after elements from 
control stream, or other way around.

Thanks again,
Tovi

From: Kostas Kloudas [mailto:k.klou...@data-artisans.com]
Sent: יום ה 07 דצמבר 2017 19:11
To: Sofer, Tovi [ICG-IT] <ts72...@imceu.eu.ssmb.com>
Cc: user@flink.apache.org
Subject: Re: Testing CoFlatMap correctness

Hi Tovi,

What you need is the TwoInputStreamOperatorTestHarness. This will allow you to 
do something like:


TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
  new TwoInputStreamOperatorTestHarness<>(myoperator);

testHarness.setup();
testHarness.open();

testHarness.processWatermark1(new Watermark(17));
testHarness.processWatermark2(new Watermark(17));
testHarness.processElement1(new StreamRecord<>(5, 12L));

testHarness.processWatermark1(new Watermark(42));
testHarness.processWatermark2(new Watermark(42));
testHarness.processElement2(new StreamRecord<>("6", 13L));

and then use testHarness.getOutput() to get the output and compare it against 
the expected one.

If you have access to the Flink source code, I would recommend you to have a 
look at the CoProcessOperatorTest for an example.

Or you can find it here: 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_blob_master_flink-2Dstreaming-2Djava_src_test_java_org_apache_flink_streaming_api_operators_co_CoProcessOperatorTest.java=DwMFaQ=j-EkbjBYwkAB4f8ZbVn1Fw=bfLStYBPfgr58eRbGoW11gp3x4kr3rJ99_MiSMX5oOs=gBef8R0NU-syKQC30s15-0u2EacsQc1Nc_-YiEJOKu8=JMo6NemjvMcOawmPTAuffrC8WfvZZppabhaJ8o5IJdY=>

Hope this helps,
Kostas



On Dec 7, 2017, at 5:54 PM, Sofer, Tovi 
<tovi.so...@citi.com<mailto:tovi.so...@citi.com>> wrote:

Hi group,

What is the best practice for testing CoFlatMap operator correctness?
We have two source functions, each emits data to stream, and a connect between 
them, and I want to make sure that when streamA element arrive before stream 
element, a certain behavior happens.
How can I test this case?

Using env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

and emitting timestamp and watermark per element didn’t help, and still each 
element arrive in unexpected order.



Thanks in advance,

Tovi



Testing CoFlatMap correctness

2017-12-07 Thread Sofer, Tovi
Hi group,

What is the best practice for testing CoFlatMap operator correctness?
We have two source functions, each emits data to stream, and a connect between 
them, and I want to make sure that when streamA element arrive before stream 
element, a certain behavior happens.
How can I test this case?

Using env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

and emitting timestamp and watermark per element didn't help, and still each 
element arrive in unexpected order.



Thanks in advance,

Tovi





slot group indication per operator

2017-12-05 Thread Sofer, Tovi
Hi all,
I am trying to use the slot group feature, by having 'default' group and 
additional 'market' group.
The purpose is to divide the resources equally between two sources and their 
following operators.
I've set the slotGroup on the source of the market data.
Can I assume that all following operators created from this source will use 
same slot group of 'market'?
(The operators created for market stream are pretty complex, with connect and 
split).
In Web UI I saw there are 16 slots, but didn't see indication per operator to 
which group it was assigned. How can I know?

Relevant Code:

env.setParallelism(8);
conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 16); \\ to allow 
Parallelism of 8 per group

// Market source and operators:

KeyedStream windowedStreamA = sourceProvider.provide(env)
.name(spotSourceProvider.getName())
.slotSharingGroup(SourceMsgType.MARKET.slotGroup())
.flatMap(new ParserMapper(new MarketMessageParser()))
.name(ParserMapper.class.getSimpleName())
.filter(new USDFilter())
.name(USDFilter.class.getSimpleName())
.keyBy(MarketEvent.CURRENCY_FIELD)
.timeWindow(Time.of(windowSizeMs, TimeUnit.MILLISECONDS))
.process(new LastInWindowPriceChangeFunction()))
.name(LastInWindowPriceChangeFunction.class.getSimpleName())
.keyBy(SpotTickEvent.CURRENCY_FIELD);


marketConnectedStream = windowedStreamA.connect(windowedStreamB)
.flatMap(new MarketCoMapper()))
.name(MarketCoMapper.class.getSimpleName())



SplitStream stocksWithSpotsStreams = marketConnectedStream
.split( market -> ImmutableList.of("splitA"," splitB") );

DataStream< MarketAWithMarketB> splitA = stocksWithSpotsStreams.select("splitA 
");


Thanks and regards,
Tovi




RE: Negative values using latency marker

2017-11-05 Thread Sofer, Tovi
Hi Nico, 

Actually the run below is on my local machine, and both Kafka and flink run on 
it.

Thanks and regards,
Tovi
-Original Message-
From: Nico Kruber [mailto:n...@data-artisans.com] 
Sent: יום ו 03 נובמבר 2017 15:22
To: user@flink.apache.org
Cc: Sofer, Tovi [ICG-IT] <ts72...@imceu.eu.ssmb.com>
Subject: Re: Negative values using latency marker

Hi Tovi,
if I see this correctly, the LatencyMarker gets its initial timstamp during 
creation at the source and the latency is reported as a metric at a sink by 
comparing the initial timestamp with the current time.
If the clocks between the two machines involved diverge, e.g. the sinks clock 
falling behind, the difference may be negative.


Nico

On Thursday, 2 November 2017 17:58:51 CET Sofer, Tovi  wrote:
> Hi group,
> 
> Can someone maybe elaborate how can latency gauge shown by latency 
> marker be negative?
> 
> 2017-11-02 18:54:56,842 INFO
> com.citi.artemis.flink.reporters.ArtemisReporter - 
> [Flink-MetricRegistry-1] 
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Sink: FinalSink.0.latency: {LatencySourceDescriptor{vertexID=1,
> subtaskIndex=0}={p99=-5.0, p50=-5.0, min=-5.0, max=-5.0, p95=-5.0, 
> mean=-5.0}, LatencySourceDescriptor{vertexID=1, 
> subtaskIndex=1}={p99=-5.0, p50=-5.0, min=-5.0, max=-5.0, p95=-5.0, 
> mean=-5.0}, LatencySourceDescriptor{vertexID=1, 
> subtaskIndex=2}={p99=-6.0, p50=-6.0, min=-6.0, max=-6.0, p95=-6.0, 
> mean=-6.0}, LatencySourceDescriptor{vertexID=1, 
> subtaskIndex=3}={p99=-6.0, p50=-6.0, min=-6.0, max=-6.0, p95=-6.0, 
> mean=-6.0}} 2017-11-02 18:54:56,843 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - 
> [Flink-MetricRegistry-1] 
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.60SecWarmUpRecordsCounter: 2858446
> 2017-11-02 18:54:56,843 INFO
> com.citi.artemis.flink.reporters.ArtemisReporter - 
> [Flink-MetricRegistry-1] 
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Source: fixTopicConsumerSource.3.numRecordsOut: 1954784 2017-11-02
> 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter - 
> [Flink-MetricRegistry-1] 
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.ActualRecordsCounter: 4962675 
> 2017-11-02
> 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter - 
> [Flink-MetricRegistry-1] 
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.AverageLatencyMs: 0.0753785 2017-11-02
> 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter - 
> [Flink-MetricRegistry-1] 
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.HighLatencyMsgPercentage: 0.5918576
> 2017-11-02 18:54:56,843 INFO
> com.citi.artemis.flink.reporters.ArtemisReporter - 
> [Flink-MetricRegistry-1] 
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Source: fixTopicConsumerSource.0.numRecordsOutPerSecond:
> 12943.1167 2017-11-02 18:54:56,843 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - 
> [Flink-MetricRegistry-1] 
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.numRecordsInPerSecond: 51751.4 
> 2017-11-02
> 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter - 
> [Flink-MetricRegistry-1] 
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Source: fixTopicConsumerSource.3.numRecordsOutPerSecond: 12935.05
> 2017-11-02 18:54:56,843 INFO
> com.citi.artemis.flink.reporters.ArtemisReporter - 
> [Flink-MetricRegistry-1] 
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Source: fixTopicConsumerSource.2.numRecordsOutPerSecond:
> 12946.9166 2017-11-02 18:54:56,843 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - 
> [Flink-MetricRegistry-1] 
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Source: fixTopicConsumerSource.1.numRecordsOutPerSecond:
> 12926.3168 2017-11-02 18:54:56,844 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - 
> [Flink-MetricRegistry-1] 
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:1 min:24753
> max:19199891 mean:77637.6484 stddev:341333.9414842662 p50:40752.0
> p75:49809.0 p95:190480.95 p98:539110.819994 p99:749224.889995
> p999:3817927.9259998496
> 
> Regards,
> Tovi



Negative values using latency marker

2017-11-02 Thread Sofer, Tovi
Hi group,

Can someone maybe elaborate how can latency gauge shown by latency marker be 
negative?

2017-11-02 18:54:56,842 INFO  com.citi.artemis.flink.reporters.ArtemisReporter 
- [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming 
Job.Sink: FinalSink.0.latency: {LatencySourceDescriptor{vertexID=1, 
subtaskIndex=0}={p99=-5.0, p50=-5.0, min=-5.0, max=-5.0, p95=-5.0, mean=-5.0}, 
LatencySourceDescriptor{vertexID=1, subtaskIndex=1}={p99=-5.0, p50=-5.0, 
min=-5.0, max=-5.0, p95=-5.0, mean=-5.0}, LatencySourceDescriptor{vertexID=1, 
subtaskIndex=2}={p99=-6.0, p50=-6.0, min=-6.0, max=-6.0, p95=-6.0, mean=-6.0}, 
LatencySourceDescriptor{vertexID=1, subtaskIndex=3}={p99=-6.0, p50=-6.0, 
min=-6.0, max=-6.0, p95=-6.0, mean=-6.0}}
2017-11-02 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter 
- [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming 
Job.AverageE2ELatencyChecker.0.60SecWarmUpRecordsCounter: 2858446
2017-11-02 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter 
- [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming 
Job.Source: fixTopicConsumerSource.3.numRecordsOut: 1954784
2017-11-02 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter 
- [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming 
Job.AverageE2ELatencyChecker.0.ActualRecordsCounter: 4962675
2017-11-02 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter 
- [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming 
Job.AverageE2ELatencyChecker.0.AverageLatencyMs: 0.0753785
2017-11-02 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter 
- [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming 
Job.AverageE2ELatencyChecker.0.HighLatencyMsgPercentage: 0.5918576
2017-11-02 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter 
- [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming 
Job.Source: fixTopicConsumerSource.0.numRecordsOutPerSecond: 12943.1167
2017-11-02 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter 
- [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming 
Job.AverageE2ELatencyChecker.0.numRecordsInPerSecond: 51751.4
2017-11-02 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter 
- [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming 
Job.Source: fixTopicConsumerSource.3.numRecordsOutPerSecond: 12935.05
2017-11-02 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter 
- [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming 
Job.Source: fixTopicConsumerSource.2.numRecordsOutPerSecond: 12946.9166
2017-11-02 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter 
- [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming 
Job.Source: fixTopicConsumerSource.1.numRecordsOutPerSecond: 12926.3168
2017-11-02 18:54:56,844 INFO  com.citi.artemis.flink.reporters.ArtemisReporter 
- [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming 
Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:1 min:24753 
max:19199891 mean:77637.6484 stddev:341333.9414842662 p50:40752.0 p75:49809.0 
p95:190480.95 p98:539110.819994 p99:749224.889995 
p999:3817927.9259998496

Regards,
Tovi



RE: state size effects latency

2017-10-30 Thread Sofer, Tovi
Hi Biplob,

We have created our own latency meter histogram, which contains the latency 
from congestion time till last operator.
This is shown in log below (99’th percentile  and mean value), and our 
estimations are based on it.
The latency you mentioned is from checkpoint tab- which shows checkpoint 
latency. It is different than record latency.
Actually we were also trying to use also LatencyMarker, but didn’t know how to 
get from it in s simple manner the E2E latency.
2017-10-26 07:26:55,030 INFO  com.citi.artemis.flink.reporters.Log4JReporter - 
[Flink-MetricRegistry-1] 
localhost.taskmanager.6afd21aeb9b9bef41a4912b023469497.Flink Streaming 
Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:1 min:31919 
max:13481166 mean:89492.0644 stddev:265876.0259763816 p50:68140.5 p75:82152.5 
p95:146654.04 p98:204671.74 p99:308958.733 
p999:3844154.002999794
Tovi

From: Biplob Biswas [mailto:revolutioni...@gmail.com]
Sent: יום ב 30 אוקטובר 2017 11:02
To: Sofer, Tovi [ICG-IT] <ts72...@imceu.eu.ssmb.com>
Cc: Narendra Joshi <narendr...@gmail.com>; user <user@flink.apache.org>
Subject: Re: state size effects latency

Hi Tovi,

This might seem a really naive question (and its neither a solution or answer 
to your question ) but I am trying to understand how latency is viewed. You 
said you achieved less than 5 ms latency and say for the 99th percentile you 
achieved 0.3 and 9 ms respectively, what kind of latency is this? specific 
operator latency? because the end to end latency is around 50ms and 370 ms.

Was just curious how latency is seen from a different perspective, would really 
help me in my understanding.

Thanks a lot,
Biplob

Thanks & Regards
Biplob Biswas

On Mon, Oct 30, 2017 at 8:53 AM, Sofer, Tovi 
<tovi.so...@citi.com<mailto:tovi.so...@citi.com>> wrote:
Thank you Joshi.
We are using currently FsStateBackend since in version 1.3 it supports async 
snapshots, and no RocksDB.

Does anyone else has feedback on this issues?

From: Narendra Joshi [mailto:narendr...@gmail.com<mailto:narendr...@gmail.com>]
Sent: יום א 29 אוקטובר 2017 12:13
To: Sofer, Tovi [ICG-IT] 
<ts72...@imceu.eu.ssmb.com<mailto:ts72...@imceu.eu.ssmb.com>>
Cc: user <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: state size effects latency


We have also faced similar issues. The only thing that happens in sync when 
using async snaphots is getting a persistent point in time picture which in 
case of rocksdb backend is making symlinks. That would linearly increase with 
number of files to symlink but this should be negligible. We could not find a 
satisfying reason for increase in latency with state size.

Best,
Narendra

Narendra Joshi
On 29 Oct 2017 15:04, "Sofer, Tovi" 
<tovi.so...@citi.com<mailto:tovi.so...@citi.com>> wrote:
Hi all,

In our application we have a requirement to very low latency, preferably less 
than 5ms.
We were able to achieve this so far, but when we start increasing the state 
size, we see distinctive decrease in latency.
We have added MinPauseBetweenCheckpoints, and are using async snapshots.

• Why does state size has such distinctive effect on latency? How can 
this effect be minimized?

• Can the state snapshot be done using separates threads and resources 
in order to less effect on stream data handling?


Details:

Application configuration:
env.enableCheckpointing(1000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
env.setStateBackend(new FsStateBackend(checkpointDirURI, true)); // use async 
snapshots
env.setParallelism (16) ; //running on machine with 40 cores

Results:


A.  When state size is ~20MB got latency of 0.3 ms latency for 99’th 
percentile

Latency info: (in nanos)
2017-10-26 07:26:55,030 INFO  com.citi.artemis.flink.reporters.Log4JReporter - 
[Flink-MetricRegistry-1] 
localhost.taskmanager.6afd21aeb9b9bef41a4912b023469497.Flink Streaming 
Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:1 min:31919 
max:13481166 mean:89492.0644 stddev:265876.0259763816 p50:68140.5 p75:82152.5 
p95:146654.04 p98:204671.74 p99:308958.733 
p999:3844154.002999794
State\checkpoint info:

[cid:image001.png@01D350DC.40449520]




B.  When state size is ~200MB latency was significantly decreased to 9 ms 
latency for 99’th percentile
Latency info:
2017-10-26 07:17:35,289 INFO  com.citi.artemis.flink.reporters.Log4JReporter - 
[Flink-MetricRegistry-1] 
localhost.taskmanager.05431e7ecab1888b2792265cdc0ddf84.Flink Streaming 
Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:1 min:30186 
max:46236470 mean:322105.7072 stddev:2060373.4782505725 p50:68979.5 
p75:85780.25 p95:219882.614 p98:2360171.439934 
p99:9251766.55945 p999:3.956163987499886E7
State\checkpoint info:


[cid:image002.png@01D350DC.40449520]

Thanks and regrdas,
Tovi




RE: state size effects latency

2017-10-30 Thread Sofer, Tovi
Thank you Joshi.
We are using currently FsStateBackend since in version 1.3 it supports async 
snapshots, and no RocksDB.

Does anyone else has feedback on this issues?

From: Narendra Joshi [mailto:narendr...@gmail.com]
Sent: יום א 29 אוקטובר 2017 12:13
To: Sofer, Tovi [ICG-IT] <ts72...@imceu.eu.ssmb.com>
Cc: user <user@flink.apache.org>
Subject: Re: state size effects latency


We have also faced similar issues. The only thing that happens in sync when 
using async snaphots is getting a persistent point in time picture which in 
case of rocksdb backend is making symlinks. That would linearly increase with 
number of files to symlink but this should be negligible. We could not find a 
satisfying reason for increase in latency with state size.

Best,
Narendra

Narendra Joshi
On 29 Oct 2017 15:04, "Sofer, Tovi" 
<tovi.so...@citi.com<mailto:tovi.so...@citi.com>> wrote:
Hi all,

In our application we have a requirement to very low latency, preferably less 
than 5ms.
We were able to achieve this so far, but when we start increasing the state 
size, we see distinctive decrease in latency.
We have added MinPauseBetweenCheckpoints, and are using async snapshots.

• Why does state size has such distinctive effect on latency? How can 
this effect be minimized?

• Can the state snapshot be done using separates threads and resources 
in order to less effect on stream data handling?


Details:

Application configuration:
env.enableCheckpointing(1000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
env.setStateBackend(new FsStateBackend(checkpointDirURI, true)); // use async 
snapshots
env.setParallelism (16) ; //running on machine with 40 cores

Results:


A.  When state size is ~20MB got latency of 0.3 ms latency for 99’th 
percentile

Latency info: (in nanos)
2017-10-26 07:26:55,030 INFO  com.citi.artemis.flink.reporters.Log4JReporter - 
[Flink-MetricRegistry-1] 
localhost.taskmanager.6afd21aeb9b9bef41a4912b023469497.Flink Streaming 
Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:1 min:31919 
max:13481166 mean:89492.0644 stddev:265876.0259763816 p50:68140.5 p75:82152.5 
p95:146654.04 p98:204671.74 p99:308958.733 
p999:3844154.002999794
State\checkpoint info:

[cid:image001.png@01D350DC.40449520]




B.  When state size is ~200MB latency was significantly decreased to 9 ms 
latency for 99’th percentile
Latency info:
2017-10-26 07:17:35,289 INFO  com.citi.artemis.flink.reporters.Log4JReporter - 
[Flink-MetricRegistry-1] 
localhost.taskmanager.05431e7ecab1888b2792265cdc0ddf84.Flink Streaming 
Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:1 min:30186 
max:46236470 mean:322105.7072 stddev:2060373.4782505725 p50:68979.5 
p75:85780.25 p95:219882.614 p98:2360171.439934 
p99:9251766.55945 p999:3.956163987499886E7
State\checkpoint info:


[cid:image002.png@01D350DC.40449520]

Thanks and regrdas,
Tovi



state size effects latency

2017-10-29 Thread Sofer, Tovi
Hi all,

In our application we have a requirement to very low latency, preferably less 
than 5ms.
We were able to achieve this so far, but when we start increasing the state 
size, we see distinctive decrease in latency.
We have added MinPauseBetweenCheckpoints, and are using async snapshots.

* Why does state size has such distinctive effect on latency? How can 
this effect be minimized?

* Can the state snapshot be done using separates threads and resources 
in order to less effect on stream data handling?


Details:

Application configuration:
env.enableCheckpointing(1000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
env.setStateBackend(new FsStateBackend(checkpointDirURI, true)); // use async 
snapshots
env.setParallelism (16) ; //running on machine with 40 cores

Results:


A.  When state size is ~20MB got latency of 0.3 ms latency for 99'th 
percentile

Latency info: (in nanos)
2017-10-26 07:26:55,030 INFO  com.citi.artemis.flink.reporters.Log4JReporter - 
[Flink-MetricRegistry-1] 
localhost.taskmanager.6afd21aeb9b9bef41a4912b023469497.Flink Streaming 
Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:1 min:31919 
max:13481166 mean:89492.0644 stddev:265876.0259763816 p50:68140.5 p75:82152.5 
p95:146654.04 p98:204671.74 p99:308958.733 
p999:3844154.002999794
State\checkpoint info:

[cid:image001.png@01D350A9.B78EF1D0]




B.  When state size is ~200MB latency was significantly decreased to 9 ms 
latency for 99'th percentile
Latency info:
2017-10-26 07:17:35,289 INFO  com.citi.artemis.flink.reporters.Log4JReporter - 
[Flink-MetricRegistry-1] 
localhost.taskmanager.05431e7ecab1888b2792265cdc0ddf84.Flink Streaming 
Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:1 min:30186 
max:46236470 mean:322105.7072 stddev:2060373.4782505725 p50:68979.5 
p75:85780.25 p95:219882.614 p98:2360171.439934 
p99:9251766.55945 p999:3.956163987499886E7
State\checkpoint info:


[cid:image002.png@01D350A9.B78EF1D0]

Thanks and regrdas,
Tovi



RE: kafka consumer parallelism

2017-10-03 Thread Sofer, Tovi
Hi Robert,

I had similar issue.
For me the problem was that the topic was auto created with one partition.
You can alter it to have 5 partitions using kafka-topics  command.
Example: 
kafka-topics --alter  --partitions 5 --topic fix --zookeeper localhost:2181 

Regards,
Tovi
-Original Message-
From: Timo Walther [mailto:twal...@apache.org] 
Sent: יום ב 02 אוקטובר 2017 20:59
To: user@flink.apache.org
Subject: Re: kafka consumer parallelism

Hi,

I'm not a Kafka expert but I think you need to have more than 1 Kafka partition 
to process multiple documents at the same time. Make also sure to send the 
documents to different partitions.

Regards,
Timo


Am 10/2/17 um 6:46 PM schrieb r. r.:
> Hello
> I'm running a job with "flink run -p5" and additionally set 
> env.setParallelism(5).
> The source of the stream is Kafka, the job uses FlinkKafkaConsumer010.
> In Flink UI though I notice that if I send 3 documents to Kafka, only one 
> 'instance' of the consumer seems to receive Kafka's record and send them to 
> next operators, which according to Flink UI are properly parallelized.
> What's the explanation of this behavior?
> According to sources:
>
> To enable parallel execution, the user defined source should
>       * implement {@link 
> org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
> } or extend {@link
>       * 
> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunc
> tion}
> which FlinkKafkaConsumer010 does
>
> Please check a screenshot at 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__imgur.com_a_E1H9r
> =DwIDaQ=j-EkbjBYwkAB4f8ZbVn1Fw=bfLStYBPfgr58eRbGoW11gp3x4kr3rJ99
> _MiSMX5oOs=LiwKApZmqwYYsiKqby4Ugd5WJgyPKpj3H7s9l7Xw_Qg=ti6cswIJ4X9
> d5wgGkq5EUx41y4WXZ_z_HebkoOrLEmw=   you'll see that only one sends 3 
> records to the sinks
>
> My code is here: 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__pastebin.com_yjYC
> XAAR=DwIDaQ=j-EkbjBYwkAB4f8ZbVn1Fw=bfLStYBPfgr58eRbGoW11gp3x4kr3
> rJ99_MiSMX5oOs=LiwKApZmqwYYsiKqby4Ugd5WJgyPKpj3H7s9l7Xw_Qg=AApHKm3
> amPLzWwAqk2KITEeUkhNE0GS1Oo02jaUpKIw=
>
> Thanks!




RE: Flink kafka consumer that read from two partitions in local mode

2017-09-26 Thread Sofer, Tovi
Hi,

Issue was solved.
After your guidance to producer part, I’ve checked in Kafka and saw that topic 
was created with one partition.
I’ve re- created it with two partitions manually and it fixed the problem.

// update in KAFKA_HOME/config/server.properties : set delete.topic.enable=true
%KAFKA_HOME%\bin\windows\kafka-topics.bat --delete  --topic fix --zookeeper 
localhost:2181
%KAFKA_HOME%\bin\windows\kafka-topics.bat --create --partitions 2 --topic fix 
--zookeeper localhost:2181 --replication-factor 1
%KAFKA_HOME%\bin\windows\kafka-topics.bat ---list --zookeeper localhost:2181

A follow-up question – is it possible to create the topic with two partitions 
while creating the FlinkKafKaProducer?
Since by default it seems to create it with one partition.

Thanks and regards,
Tovi

From: Sofer, Tovi [ICG-IT]
Sent: יום ב 25 ספטמבר 2017 17:18
To: 'Tzu-Li (Gordon) Tai'; Fabian Hueske
Cc: user
Subject: RE: Flink kafka consumer that read from two partitions in local mode

Hi Gordon,

Thanks for your assistance.


· We are running flink currently  in local mode(MiniCluster), using 
flink 1.3.2 and flink-connector-kafka-0.10_2.10.


· In Consumer log I see 1 partition only (when parallelism=1), so the 
problem indeed seems to be in producer.
2017-09-25 17:10:58,140 WARN  org.apache.kafka.clients.consumer.ConsumerConfig 
- [Source: fix_topic -> FixMapper (1/1)] The configuration 'topic.name' was 
supplied but isn't a known config.
2017-09-25 17:10:58,143 INFO  org.apache.kafka.common.utils.AppInfoParser - 
[Source: fix_topic -> FixMapper (1/1)] Kafka version : 0.10.2.1
2017-09-25 17:10:58,144 INFO  org.apache.kafka.common.utils.AppInfoParser - 
[Source: fix_topic -> FixMapper (1/1)] Kafka commitId : e89bffd6b2eff799
2017-09-25 17:10:58,679 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - [Source: 
fix_topic -> FixMapper (1/1)] Got 1 partitions from these topics: [fix]
2017-09-25 17:10:58,679 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - [Source: 
fix_topic -> FixMapper (1/1)] Consumer is going to read the following topics 
(with number of partitions): fix (1),
2017-09-25 17:10:58,680 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - [Source: 
fix_topic -> FixMapper (1/1)] Consumer subtask 0 will start reading the 
following 1 partitions from the committed group offsets in Kafka: 
[KafkaTopicPartition{topic='fix', partition=0}]


· The producer seems to write to one partition only.

internalProducer.topicPartitionsMap and Cluster.Partitions seems to have one 
partition for FIX topic.



In producer log each producer start with configuration below:

2017-09-25 17:06:49,596 INFO  org.apache.kafka.clients.producer.ProducerConfig- 
[Source: random -> Sink: fixTopicSink (2/2)] ProducerConfig values:

acks = 1

batch.size = 16384

block.on.buffer.full = false

bootstrap.servers = [localhost:9092]

buffer.memory = 33554432

client.id =

compression.type = none

connections.max.idle.ms = 54

interceptor.classes = null

key.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer

linger.ms = 0

max.block.ms = 6

max.in.flight.requests.per.connection = 5

max.request.size = 1048576

metadata.fetch.timeout.ms = 6

metadata.max.age.ms = 30

metric.reporters = []

metrics.num.samples = 2

metrics.sample.window.ms = 3

partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner

receive.buffer.bytes = 32768

reconnect.backoff.ms = 50

request.timeout.ms = 3

retries = 0

retry.backoff.ms = 100

sasl.jaas.config = null

sasl.kerberos.kinit.cmd = /usr/bin/kinit

sasl.kerberos.min.time.before.relogin = 6

sasl.kerberos.service.name = null

sasl.kerberos.ticket.renew.jitter = 0.05

sasl.kerberos.ticket.renew.window.factor = 0.8

sasl.mechanism = GSSAPI

security.protocol = PLAINTEXT

send.buffer.bytes = 131072

ssl….

timeout.ms = 3

value.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer

And print when starting:

2017-09-25 17:07:46,907 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase- [Source: 
random -> Sink: fixTopicSink (2/2)] Starting FlinkKafkaProducer (2/2) to 
produce into default topic fix


Thanks,
Tovi

From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org]
Sent: יום ב 25 ספט

RE: Flink kafka consumer that read from two partitions in local mode

2017-09-25 Thread Sofer, Tovi
Hi Gordon,

Thanks for your assistance.


· We are running flink currently  in local mode(MiniCluster), using 
flink 1.3.2 and flink-connector-kafka-0.10_2.10.


· In Consumer log I see 1 partition only (when parallelism=1), so the 
problem indeed seems to be in producer.
2017-09-25 17:10:58,140 WARN  org.apache.kafka.clients.consumer.ConsumerConfig 
- [Source: fix_topic -> FixMapper (1/1)] The configuration 'topic.name' was 
supplied but isn't a known config.
2017-09-25 17:10:58,143 INFO  org.apache.kafka.common.utils.AppInfoParser - 
[Source: fix_topic -> FixMapper (1/1)] Kafka version : 0.10.2.1
2017-09-25 17:10:58,144 INFO  org.apache.kafka.common.utils.AppInfoParser - 
[Source: fix_topic -> FixMapper (1/1)] Kafka commitId : e89bffd6b2eff799
2017-09-25 17:10:58,679 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - [Source: 
fix_topic -> FixMapper (1/1)] Got 1 partitions from these topics: [fix]
2017-09-25 17:10:58,679 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - [Source: 
fix_topic -> FixMapper (1/1)] Consumer is going to read the following topics 
(with number of partitions): fix (1),
2017-09-25 17:10:58,680 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - [Source: 
fix_topic -> FixMapper (1/1)] Consumer subtask 0 will start reading the 
following 1 partitions from the committed group offsets in Kafka: 
[KafkaTopicPartition{topic='fix', partition=0}]


· The producer seems to write to one partition only.

internalProducer.topicPartitionsMap and Cluster.Partitions seems to have one 
partition for FIX topic.



In producer log each producer start with configuration below:

2017-09-25 17:06:49,596 INFO  org.apache.kafka.clients.producer.ProducerConfig- 
[Source: random -> Sink: fixTopicSink (2/2)] ProducerConfig values:

acks = 1

batch.size = 16384

block.on.buffer.full = false

bootstrap.servers = [localhost:9092]

buffer.memory = 33554432

client.id =

compression.type = none

connections.max.idle.ms = 54

interceptor.classes = null

key.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer

linger.ms = 0

max.block.ms = 6

max.in.flight.requests.per.connection = 5

max.request.size = 1048576

metadata.fetch.timeout.ms = 6

metadata.max.age.ms = 30

metric.reporters = []

metrics.num.samples = 2

metrics.sample.window.ms = 3

partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner

receive.buffer.bytes = 32768

reconnect.backoff.ms = 50

request.timeout.ms = 3

retries = 0

retry.backoff.ms = 100

sasl.jaas.config = null

sasl.kerberos.kinit.cmd = /usr/bin/kinit

sasl.kerberos.min.time.before.relogin = 6

sasl.kerberos.service.name = null

sasl.kerberos.ticket.renew.jitter = 0.05

sasl.kerberos.ticket.renew.window.factor = 0.8

sasl.mechanism = GSSAPI

security.protocol = PLAINTEXT

send.buffer.bytes = 131072

ssl….

timeout.ms = 3

value.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer

And print when starting:

2017-09-25 17:07:46,907 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase- [Source: 
random -> Sink: fixTopicSink (2/2)] Starting FlinkKafkaProducer (2/2) to 
produce into default topic fix


Thanks,
Tovi

From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org]
Sent: יום ב 25 ספטמבר 2017 15:06
To: Sofer, Tovi [ICG-IT]; Fabian Hueske
Cc: user
Subject: RE: Flink kafka consumer that read from two partitions in local mode

Hi Tovi,

Your code seems to be correct, and as Fabian described, you don’t need 
parallelism of 2 to read 2 partitions; a single parallel instance of the source 
can read multiple partitions.

I’m not sure what could have possibly gone wrong at the moment from a first 
look, so I may need to randomly ask you some questions:

Could you let me know which Flink version you are on?
Also, could you try searching in the log to see if you find consumer logs such 
as:
“Consumer subtask ... will start reading the following (numPartitions) 
partitions from ...: (partition info) "
You can try setting parallelism of the source to 1, and you should see that the 
subtask is reading 2 partitions.

From the metrics log it does seem like the consumer has picked up both 
partitions 0 and 1, but no records seem to be coming from partition 0. 

RE: Flink kafka consumer that read from two partitions in local mode

2017-09-24 Thread Sofer, Tovi
Thank you Fabian.

Fabian, Gordon, am I missing something in consumer setup?
Should I configure consumer in some way to subscribe to two partitions?

Thanks and regards,
Tovi

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: יום ג 19 ספטמבר 2017 22:58
To: Sofer, Tovi [ICG-IT]
Cc: user; Tzu-Li (Gordon) Tai
Subject: Re: Flink kafka consumer that read from two partitions in local mode

Hi Tovi,
your code looks OK to me. Maybe Gordon (in CC) has an idea what is going wrong.
Just a side note: you don't need to set the parallelism to 2 to read from two 
partitions. A single consumer instance reads can read from multiple partitions.
Best,
Fabian

2017-09-19 17:02 GMT+02:00 Sofer, Tovi 
<tovi.so...@citi.com<mailto:tovi.so...@citi.com>>:
Hi,

I am trying to setup FlinkKafkaConsumer which reads from two partitions in 
local mode, using  setParallelism=2.
The producer writes to two partition (as it is shown in metrics report).
But the consumer seems to read always from one partition only.
Am I missing something in partition configuration?

Code:


Producer setup:
Configuration localConfig = new Configuration();
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(parallelism, localConfig);

env.setParallelism(2);

String kafkaPort = 
parameters.get(SimulatorConfig.ParamsEnum.KAFKA_PORT.fullName());

SingleOutputStreamOperator fixMsgSource = 
env.addSource(srcMsgProvider.getFixMsgSource(), 
TypeInformation.of(String.class)).name(sourceGenerationType.getValue());
fixMsgSource.addSink(new FlinkKafkaProducer010<>("localhost:"  + kafkaPort, 
TOPIC_NAME, new SimpleStringSchema()))

.name(“fix_topic”);

env.execute(“MsgSimulatorJob”);


Consumer setup:

String topicName = “fix”;
Configuration conf = new Configuration();
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

env.setParallelism(2);
env.getConfig().setGlobalJobParameters(configParams); // make parameters 
available in the web interface
DeserializationSchema<Tuple2<Long, String>> deserializationSchema = new 
SimpleStringAndTimestampDeserializationSchema ();
FlinkKafkaConsumer010<Tuple2<Long, String>> kafkaConsumer = new 
FlinkKafkaConsumer010<>(topicName, deserializationSchema, 
kafkaParams.getProperties());

DataStream<Tuple2<Long, String>> fixMessagesStream = 
env.addSource(kafkaConsumer).name("fix_topic").setParallelism(2);

As you can see in output, only 1 consumer partition seems to be used:
Producer output:
2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: 
random -> Sink: fix_topic.1.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: 
fix_topic.1.numRecordsInPerSecond: 19836.0333
2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: 
fix_topic.0.numRecordsInPerSecond: 20337.9334
2017-09-19 14:40:45,819 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: 
random -> Sink: fix_topic.0.numRecordsInPerSecond: 0.0
Consumer output:
2017-09-19 14:40:45,116 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.KafkaConsumer.select-rate: 1928.1421153709368
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.KafkaConsumer.commit-rate: 0.21623491761449637
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.KafkaConsumer.outgoing-byte-rate: 982.0051413881748
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.KafkaConsumer.sync-rate: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.KafkaConsumer.io-ratio: 0.01148712465103046
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.numRecordsOutPerSecond: 6625.2666
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.numBytesOutPerSecond: 1.40222884E7
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a

Flink kafka consumer that read from two partitions in local mode

2017-09-19 Thread Sofer, Tovi
Hi,

I am trying to setup FlinkKafkaConsumer which reads from two partitions in 
local mode, using  setParallelism=2.
The producer writes to two partition (as it is shown in metrics report).
But the consumer seems to read always from one partition only.
Am I missing something in partition configuration?

Code:


Producer setup:
Configuration localConfig = new Configuration();
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(parallelism, localConfig);

env.setParallelism(2);

String kafkaPort = 
parameters.get(SimulatorConfig.ParamsEnum.KAFKA_PORT.fullName());

SingleOutputStreamOperator fixMsgSource = 
env.addSource(srcMsgProvider.getFixMsgSource(), 
TypeInformation.of(String.class)).name(sourceGenerationType.getValue());
fixMsgSource.addSink(new FlinkKafkaProducer010<>("localhost:"  + kafkaPort, 
TOPIC_NAME, new SimpleStringSchema()))

.name("fix_topic");

env.execute("MsgSimulatorJob");


Consumer setup:

String topicName = "fix";
Configuration conf = new Configuration();
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

env.setParallelism(2);
env.getConfig().setGlobalJobParameters(configParams); // make parameters 
available in the web interface
DeserializationSchema> deserializationSchema = new 
SimpleStringAndTimestampDeserializationSchema ();
FlinkKafkaConsumer010> kafkaConsumer = new 
FlinkKafkaConsumer010<>(topicName, deserializationSchema, 
kafkaParams.getProperties());

DataStream> fixMessagesStream = 
env.addSource(kafkaConsumer).name("fix_topic").setParallelism(2);

As you can see in output, only 1 consumer partition seems to be used:
Producer output:
2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: 
random -> Sink: fix_topic.1.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: 
fix_topic.1.numRecordsInPerSecond: 19836.0333
2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: 
fix_topic.0.numRecordsInPerSecond: 20337.9334
2017-09-19 14:40:45,819 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: 
random -> Sink: fix_topic.0.numRecordsInPerSecond: 0.0
Consumer output:
2017-09-19 14:40:45,116 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.KafkaConsumer.select-rate: 1928.1421153709368
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.KafkaConsumer.commit-rate: 0.21623491761449637
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.KafkaConsumer.outgoing-byte-rate: 982.0051413881748
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.KafkaConsumer.sync-rate: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.KafkaConsumer.io-ratio: 0.01148712465103046
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.numRecordsOutPerSecond: 6625.2666
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.numBytesOutPerSecond: 1.40222884E7
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.0.numBytesInRemotePerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.0.numBytesOutPerSecond: 10.5
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.0.numBytesInLocalPerSecond: 0.0
2017-09-19