Re: Access logs for a running Flink app in YARN cluster

2018-04-12 Thread 杨力
Maybe you can get them from yarn with rest API.

Tao Xia  于 2018年4月13日周五 上午8:09写道:

> Any good way to get access container logs from a running Flink app in YARN
> cluster in EMR?
> You can view the logs through YARN UI. But cannot programmatically access
> it and send to other services.
> The log aggregator only runs when the application finishes or a minimum
> 3600 secs copy. Any way we can get the logs more frequently?
>


Question about parallelism

2018-04-12 Thread TechnoMage
I am pretty new to flink.  I have a flink job that has 10 transforms (mostly 
CoFlatMap with some simple filters and key extractrs as well.  I have the 
config set for 6 slots and default parallelism of 6, but all my stages show 
paralellism of 1.  Is that because there is only one task manager?  Some of 
what I have read suggested separate slots were needed to use multiple threads 
on a single box?  I have read the section on the docs several times and still 
not totally sure about the execution model.

Michael

Access logs for a running Flink app in YARN cluster

2018-04-12 Thread Tao Xia
Any good way to get access container logs from a running Flink app in YARN
cluster in EMR?
You can view the logs through YARN UI. But cannot programmatically access
it and send to other services.
The log aggregator only runs when the application finishes or a minimum
3600 secs copy. Any way we can get the logs more frequently?


Re: State management and heap usage

2018-04-12 Thread TechnoMage
Thank you.

Michael

> On Apr 12, 2018, at 2:45 AM, Gary Yao  wrote:
> 
> Hi Michael,
> 
> You can configure the default state backend by setting state.backend in
> flink-conf.yaml, or you can configure it per job [1]. The default state 
> backend
> is "jobmanager" (MemoryStateBackend), which stores state and checkpoints on 
> the
> Java heap. RocksDB must be explicitly enabled, e.g., by setting state.backend 
> to
> "rocksdb".
> 
> Best,
> Gary
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/state_backends.html#configuring-a-state-backend
>  
> 
> 
> On Wed, Apr 11, 2018 at 11:04 PM, TechnoMage  > wrote:
> I am pretty new to flink and have an initial streaming job working both 
> locally and remotely.  But, both ways if the data volume is too high it runs 
> out of heap.  I am using RichMapFunction to process multiple streams of data. 
>  I assumed Flink would manage keeping state in ram when possible, and spill 
> to RocksDB when it exceeded heap.
> 
> Is this correct?  If so are there configs I need to set to enable or tune 
> this so it can run within a fixed memory size?
> 
> Michael
> 



Re: Is Flink able to do real time stock market analysis?

2018-04-12 Thread TechnoMage
Given the data from a window can not arrive before any of the data in that 
window, it will always arrive after the raw data for the same period, and may 
have some latency relative to the raw data.  If your RichFlatMapFunction uses a 
ListState to hold more than one window worth of raw and smoothed data, you 
should be able to get what you want.  Given distributed systems and relative 
time I am not sure you will get simpler than that.

Michael

> On Apr 12, 2018, at 7:52 AM, Ivan Wang  wrote:
> 
> Thanks Michael very much, it helps a lot! 
>  
> I tried what you suggest and now I can compare smoothed data with raw date in 
> coFlat method.
> However, it cannot ensure that the smoothed data is coming in the expected 
> way.  Basically for every raw event, I’d like to refer to the early but 
> closest event in smoothed data. However, it cannot be guaranteed by default. 
> For example, we raw event comes with event time 13:01:39, I’d like to refer 
> to smoothed event with event time 13:01:30 due to 15 seconds interval. But 
> the latter only arrives after raw event 13:01:58, this happens at least in 
> batch processing when I did historical analysis.  
>  
> I corrected the order by using key state in coFlatMap method. I stored the 
> latest smoothed event and queued raw event if they arrive too early.
>  
> My question is that is there any better and straightforward way to correct 
> the order? Because it makes the code hard to read. I’m thinking about 
> watermark, but not sure how to do this.
>  
>  
> -- 
> Thanks
> Ivan
> From: TechnoMage 
> Date: Thursday, 12 April 2018 at 3:21 AM
> To: Ivan Wang 
> Cc: "user@flink.apache.org" 
> Subject: Re: Is Flink able to do real time stock market analysis?
>  
> I am new to Flink so others may have more complete answer or correct me. <>
>  
> If you are counting the events in a tumbling window you will get output at 
> the end of each tumbling window, so a running count of events/window.  It 
> sounds like you want to compare the raw data to the smoothed data?  You can 
> use a CoFlatMap to receive both streams and output any records you like, say 
> a Tuple with the raw and smoothed value.  If you use a RichCoFlatMap you can 
> track state, so you could keep a list of the last 20 or so raw and smoothed 
> values so you can align them.
>  
> Michael
> 
> 
> On Apr 10, 2018, at 6:40 PM, Ivan Wang  > wrote:
>  
> Hi all, 
>  
> I've spent nearly 2 weeks trying to figure a solution to my requirement as 
> below. If anyone can advise, that would be great.
>  
> 1. There're going to be 2000 transactions per second as StreamRaw, I'm going 
> to tumbleWindow them as StreamA, let's say every 15 seconds. Then I'm going 
> to countWindow StreamA as StreamB, let's say every 20 events.
>  
> 2. For every event in  StreamRaw as E, I need to find exact one event in 
> StreamB which is earlier than E and closest to E. Then some comparison will 
> be proceeded. For example, if timestamp in E is 9:46:38, there should be an 
> event in StreamB with timestamp 9:46:30 because I use 15 seconds interval. 
> 
> 
> I tried CEP using StreamRaw, however, I didn't figure out how to involve 
> StreamB and get the exact one event in condition method.
> 
> 
> I tried tableAPI and SQL, it throws time attribute error during the second 
> window method. 
> 
> 
> window(Tumble).group().select().window(Slide).group().select()
> 
> 
> Seems there's no way to tell Flink the time attribute after the first 
> window.group(). I then tried to convert it into table first then leftoutJoin 
> them. But Flink tells me it's not supported.
>  
> Is Flink able to do this? If not, I'll go for other alternatives. Thanks 
> again if someone can help.



Re: Kafka Consumers Partition Discovery doesn't work

2018-04-12 Thread Tzu-Li (Gordon) Tai
Hi Juno,

Thanks for reporting back, glad to know that it's not an issue :)

In general, connector specific configurations should always happen at the
connector level, per-connector.
The flink-conf.yaml file is usually for cluster wide configurations.

And yes, it might be helpful to have a code snippet to demonstrate the
configuration for partition discovery.
Could you open a JIRA for that?

Cheers,
Gordon

On Tue, Apr 10, 2018, 8:44 AM Juho Autio  wrote:

> Ahhh looks like I had simply misunderstood where that property should go.
>
> The docs correctly say:
> > To enable it, set a non-negative value for
> flink.partition-discovery.interval-millis in the __provided properties
> config__
>
> So it should be set in the Properties that are passed in the constructor
> of FlinkKafkaConsumer!
>
> I had somehow assumed that this should go to flink-conf.yaml (maybe
> because it starts with "flink."?), and obviously the FlinkKafkaConsumer
> doesn't read that.
>
> Sorry for the trouble. If anything, I guess a piece of example code
> might've helped me avoid this mistake. The docs are clear though, I just
> had become blind to this detail as I thought I had already read it.
>
> On Thu, Apr 5, 2018 at 10:26 AM, Juho Autio  wrote:
>
>> Still not working after I had a fresh build from
>> https://github.com/apache/flink/tree/release-1.5.
>>
>> When the job starts this is logged:
>>
>> 2018-04-05 09:29:38,157 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: flink.partition-discovery.interval-millis, 6
>>
>> So that's 1 minute.
>>
>> As before, I added one more partition to a topic that is being consumed.
>> Secor started consuming it as expected, but Flink didn't – or at least it
>> isn't reporting anything about doing so. The new partition is not shown in
>> Flink task metrics or consumer offsets committed by Flink.
>>
>> How could I investigate this further? How about that additional logging
>> for partition discovery?
>>
>> On Thu, Mar 22, 2018 at 3:09 PM, Tzu-Li (Gordon) Tai > > wrote:
>>
>>> Hi,
>>>
>>> I think you’ve made a good point: there is currently no logs that tell
>>> anything about discovering a new partition. We should probably add this.
>>>
>>> And yes, it would be great if you can report back on this using either
>>> the latest master, release-1.5 or release-1.4 branches.
>>>
>>> On 22 March 2018 at 10:24:09 PM, Juho Autio (juho.au...@rovio.com)
>>> wrote:
>>>
>>> Thanks, that sounds promising. I don't know how to check if it's
>>> consuming all partitions? For example I couldn't find any logs about
>>> discovering a new partition. However, did I understand correctly that this
>>> is also fixed in Flink dev? If yes, I could rebuild my 1.5-SNAPSHOT and try
>>> again.
>>>
>>> On Thu, Mar 22, 2018 at 4:18 PM, Tzu-Li (Gordon) Tai <
>>> tzuli...@apache.org> wrote:
>>>
 Hi Juho,

 Can you confirm that the new partition is consumed, but only that
 Flink’s reported metrics do not include them?
 If yes, then I think your observations can be explained by this issue:
 https://issues.apache.org/jira/browse/FLINK-8419

 
 This issue should have been fixed in the recently released 1.4.2
 version.

 Cheers,
 Gordon

 On 22 March 2018 at 8:02:40 PM, Juho Autio (juho.au...@rovio.com)
 wrote:

 According to the docs*, flink.partition-discovery.interval-millis can
 be set to enable automatic partition discovery.

 I'm testing this, apparently it doesn't work.

 I'm using Flink Version: 1.5-SNAPSHOT Commit: 8395508
 and FlinkKafkaConsumer010.

 I had my flink stream running, consuming an existing topic with 3
 partitions, among some other topics.
 I modified partitions of an existing topic: 3 -> 4**.
 I checked consumer offsets by secor: it's now consuming all 4
 partitions.
 I checked consumer offset by my flink stream: it's still consuming only
 the 3 original partitions.

 I also checked the Task Metrics of this job from Flink UI and it only
 offers Kafka related metrics to be added for 3 partitions (0,1 & 2).

 According to Flink UI > Job Manager > Configuration:
 flink.partition-discovery.interval-millis=6
 – so that's just 1 minute. It's already more than 20 minutes since I
 added the new partition, so Flink should've picked it up.

 How to debug?


 Btw, this job has external checkpoints enabled, done once per minute.
 Those are also succeeding.

 *)
 https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-topic-and-partition-discovery

 **)

 ~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --describe
 --topic my_topic
 Topic:my_topic PartitionCount:3 ReplicationFactor:1 

Re: How to customize triggering of checkpoints?

2018-04-12 Thread Steven Wu
Syed, I am very curious about the motivation if you can share.

On Wed, Apr 11, 2018 at 1:35 AM, Chesnay Schepler 
wrote:

> Hello,
>
> there is no way to manually trigger checkpoints or configure irregular
> intervals.
>
> You will have to modify the CheckpointCoordinator
> 
> and build Flink from source:
>
>- startCheckpointScheduler() should only schedule a one-time execution
>of the trigger
>- ScheduledTrigger#run() should reschedule itself
>   - Something like:
>  - triggerCheckpoint(System.currentTimeMillis(), true);
>  - long msUntilNextCheckpoint = // insert logic here
>  - timer.schedule(new ScheduledTrigger(), msUntilNextCheckpoint,
>  TimeUnit.MILLISECONDS)
>
> On 11.04.2018 05:15, syed wrote:
>
> I am new to the flink environment and looking to analyze the triggering of
> checkpoints. I am looking to trigger non-periodic checkpoints such that
> checkpoint intervals are not of equal length, but not sure how can I do this
> in Flink.
>
> My specific query is;
>
> (1) How can I trigger non-periodic checkpoints in Flink? I am looking to
> trigger first checkpoint say after 10 seconds, the next checkpoint at say 25
> seconds, third at 45 seconds and so on. Can I define my own function which
> triggers non-periodic checkpoints and generates no-uniform checkpoint
> intervals?
> Thanks.
>
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>


Re: keyBy and parallelism

2018-04-12 Thread Ken Krugler
I’m not sure I understand the actual use case, but …

Using a rebalance() to randomly distribute keys to operators is what I think 
you’d need to do to support “even if I have less keys that slots, I wants each 
slot to take his share in the work”

So it sounds like you want to (a) broadcast all rules (so every operator task 
has all of the rules), and then (b) randomly distribute the keys to the 
operator.

Then have a custom function that examines the keys to figure out what rule(s) 
to apply.

There are often timing issues here, where you have to buffer keys while waiting 
for all (to some definition of “all”) the rules to arrive before you start 
processing the keys.

— Ken

> On Apr 12, 2018, at 2:44 AM, Christophe Jolif  wrote:
> 
> Sihua,
> 
> On Thu, Apr 12, 2018 at 10:04 AM, 周思华  > wrote:
> Hi Christophe,
> I think what you want to do is "stream join", and I'm a bit confuse that if 
> you have know there are only 8 keys   then why would you still like to use 16 
> parallelisms? 8 of them will be idle(a waste of CPU). In the KeyedStream, the 
> tuples with the same key will be sent to the same parrallelism. 
> 
> 
> First my 8 keys, 16 parallelisms is just an example. Real life it is a bit 
> more complicated. But basically the idea is that I have a certain number of 
> task slots, and I want to get them busy so that my processing is as fast as 
> possible. Even if I have less keys that slots, I wants each slot to take his 
> share in the work.
>  
> 
> And I'm also a bit confuse about the pseudo code, it looks like you regard 
> that the tuple with the same key in stream A will always arrive before the 
> tuple in stream B? I think that can't be promised... you may need to store 
> the tuple in stream B in case that tuple in stream B arrive before A, and do 
> the "analysis logic" in both flatMap1() and flatMap2().
> 
> 
> You are right. I just wanted to focus on my issue which is :
> 
> 1/ having a co-processing that is considering only stuff of the same key and 
> that can store in the key-state the "rules" (and as you said I might have to 
> store other things for ordering reasons)
> 2/ but being able to parallelism a given key to use as much parallelism as my 
> cluster allow me to do so.
> 
> 
> Regards,
> Sihua Zhou
> 
> On 04/12/2018 15:44,Christophe Jolif 
>  wrote:
> Thanks Chesnay (and others).
> 
> That's what I was figuring out. Now let's go onto the follow up with my exact 
> use-case.
> 
> I have two streams A and B. A basically receives "rules" that the processing 
> of B should observe to process.
> 
> There is a "key" that allows me to know that a rule x coming in A is for 
> events with the same key coming in B.
> 
> I was planning to do (pseudo code):
> 
> A.connect(B).keyBy("thekey").flatMap(
>flatMap1()
>   -> store in a ValueState the rule 
>flatMap2()
>   -> use the state to get the rule, transform the element according to 
> the rule, collect it
> )
> 
> 
> I think it should work, right, because the ValueState will be "per key" and 
> contain the rule for this key and so on?
> 
> Now, what I really care is not having all the elements of key1 in the same 
> parallelism, I just want to make sure key1 and key2 are isolated so I can use 
> the key state to store the corresponding rule and key2 rules are not used for 
> key1 and conversely.
> 
> So ideally instead of using 8 parallelisms, in order to use the full power of 
> my system, even with 8 keys I would like to use 16 parallelisms as I don't 
> care about all elements of key1 being in the same parallelism. All I care is 
> that the state contain the rule corresponding to this key.
> 
> What would be the recommended approach here?
> 
> Thanks again for your help,
> --
> Christophe
> 
> 
> On Thu, Apr 12, 2018 at 9:31 AM, Chesnay Schepler  > wrote:
> You will get 16 parallel executions since you specify a parallellism of 16, 
> however 8 of these will not get any data.
> 
> 
> On 11.04.2018 23:29, Hao Sun wrote:
>> From what I learnt, you have to control parallelism your self. You can set 
>> parallelism on operator or set default one through flink-config.yaml.
>> I might be wrong.
>> 
>> On Wed, Apr 11, 2018 at 2:16 PM Christophe Jolif > > wrote:
>> Hi all,
>> 
>> Imagine I have a default parallelism of 16 and I do something like
>> 
>> stream.keyBy("something").flatMap()
>> 
>> Now let's imagine I have less than 16 keys, maybe 8.
>> 
>> How many parallel executions of the flatMap function will I get? 8 because I 
>> have 8 keys, or 16 because I have default parallelism at 16?
>> 
>> (and I will have follow up questions depending on the answer I suspect ;))
>> 
>> Thanks,
>> -- 
>> Christophe
> 
> 
> 
> 


http://about.me/kkrugler
+1 530-210-6378



Re: Is Flink able to do real time stock market analysis?

2018-04-12 Thread Ivan Wang
Thanks Michael very much, it helps a lot!

I tried what you suggest and now I can compare smoothed data with raw date in 
coFlat method.
However, it cannot ensure that the smoothed data is coming in the expected way. 
 Basically for every raw event, I’d like to refer to the early but closest 
event in smoothed data. However, it cannot be guaranteed by default. For 
example, we raw event comes with event time 13:01:39, I’d like to refer to 
smoothed event with event time 13:01:30 due to 15 seconds interval. But the 
latter only arrives after raw event 13:01:58, this happens at least in batch 
processing when I did historical analysis.

I corrected the order by using key state in coFlatMap method. I stored the 
latest smoothed event and queued raw event if they arrive too early.

My question is that is there any better and straightforward way to correct the 
order? Because it makes the code hard to read. I’m thinking about watermark, 
but not sure how to do this.


--
Thanks
Ivan
From: TechnoMage 
Date: Thursday, 12 April 2018 at 3:21 AM
To: Ivan Wang 
Cc: "user@flink.apache.org" 
Subject: Re: Is Flink able to do real time stock market analysis?

I am new to Flink so others may have more complete answer or correct me.

If you are counting the events in a tumbling window you will get output at the 
end of each tumbling window, so a running count of events/window.  It sounds 
like you want to compare the raw data to the smoothed data?  You can use a 
CoFlatMap to receive both streams and output any records you like, say a Tuple 
with the raw and smoothed value.  If you use a RichCoFlatMap you can track 
state, so you could keep a list of the last 20 or so raw and smoothed values so 
you can align them.

Michael


On Apr 10, 2018, at 6:40 PM, Ivan Wang 
> wrote:

Hi all,

I've spent nearly 2 weeks trying to figure a solution to my requirement as 
below. If anyone can advise, that would be great.

1. There're going to be 2000 transactions per second as StreamRaw, I'm going to 
tumbleWindow them as StreamA, let's say every 15 seconds. Then I'm going to 
countWindow StreamA as StreamB, let's say every 20 events.

2. For every event in  StreamRaw as E, I need to find exact one event in 
StreamB which is earlier than E and closest to E. Then some comparison will be 
proceeded. For example, if timestamp in E is 9:46:38, there should be an event 
in StreamB with timestamp 9:46:30 because I use 15 seconds interval.


I tried CEP using StreamRaw, however, I didn't figure out how to involve 
StreamB and get the exact one event in condition method.


I tried tableAPI and SQL, it throws time attribute error during the second 
window method.


window(Tumble).group().select().window(Slide).group().select()


Seems there's no way to tell Flink the time attribute after the first 
window.group(). I then tried to convert it into table first then leftoutJoin 
them. But Flink tells me it's not supported.

Is Flink able to do this? If not, I'll go for other alternatives. Thanks again 
if someone can help.













Re: keyBy and parallelism

2018-04-12 Thread Christophe Jolif
Sihua,

On Thu, Apr 12, 2018 at 10:04 AM, 周思华  wrote:

> Hi Christophe,
> I think what you want to do is "stream join", and I'm a bit confuse that
> if you have know there are only 8 keys   then why would you still like to
> use 16 parallelisms? 8 of them will be idle(a waste of CPU). In the
> KeyedStream, the tuples with the same key will be sent to the same
> parrallelism.
>


First my 8 keys, 16 parallelisms is just an example. Real life it is a bit
more complicated. But basically the idea is that I have a certain number of
task slots, and I want to get them busy so that my processing is as fast as
possible. Even if I have less keys that slots, I wants each slot to take
his share in the work.


>
> And I'm also a bit confuse about the pseudo code, it looks like you regard
> that the tuple with the same key in stream A will always arrive before the
> tuple in stream B? I think that can't be promised... you may need to store
> the tuple in stream B in case that tuple in stream B arrive before A, and
> do the "analysis logic" in both flatMap1() and flatMap2().
>


You are right. I just wanted to focus on my issue which is :

1/ having a co-processing that is considering only stuff of the same key
and that can store in the key-state the "rules" (and as you said I might
have to store other things for ordering reasons)
2/ but being able to parallelism a given key to use as much parallelism as
my cluster allow me to do so.


Regards,
> Sihua Zhou
>
> On 04/12/2018 15:44,Christophe Jolif 
> wrote:
>
> Thanks Chesnay (and others).
>
> That's what I was figuring out. Now let's go onto the follow up with my
> exact use-case.
>
> I have two streams A and B. A basically receives "rules" that the
> processing of B should observe to process.
>
> There is a "key" that allows me to know that a rule x coming in A is for
> events with the same key coming in B.
>
> I was planning to do (pseudo code):
>
> A.connect(B).keyBy("thekey").flatMap(
>flatMap1()
>   -> store in a ValueState the rule
>flatMap2()
>   -> use the state to get the rule, transform the element according to
> the rule, collect it
> )
>
>
> I think it should work, right, because the ValueState will be "per key"
> and contain the rule for this key and so on?
>
> Now, what I really care is not having all the elements of key1 in the same
> parallelism, I just want to make sure key1 and key2 are isolated so I can
> use the key state to store the corresponding rule and key2 rules are not
> used for key1 and conversely.
>
> So ideally instead of using 8 parallelisms, in order to use the full
> power of my system, even with 8 keys I would like to use 16 parallelisms as
> I don't care about all elements of key1 being in the same parallelism. All
> I care is that the state contain the rule corresponding to this key.
>
> What would be the recommended approach here?
>
> Thanks again for your help,
> --
> Christophe
>
>
> On Thu, Apr 12, 2018 at 9:31 AM, Chesnay Schepler 
> wrote:
>
>> You will get 16 parallel executions since you specify a parallellism of
>> 16, however 8 of these will not get any data.
>>
>>
>> On 11.04.2018 23:29, Hao Sun wrote:
>>
>> From what I learnt, you have to control parallelism your self. You can
>> set parallelism on operator or set default one through flink-config.yaml.
>> I might be wrong.
>>
>> On Wed, Apr 11, 2018 at 2:16 PM Christophe Jolif 
>> wrote:
>>
>>> Hi all,
>>>
>>> Imagine I have a default parallelism of 16 and I do something like
>>>
>>> stream.keyBy("something").flatMap()
>>>
>>> Now let's imagine I have less than 16 keys, maybe 8.
>>>
>>> How many parallel executions of the flatMap function will I get? 8
>>> because I have 8 keys, or 16 because I have default parallelism at 16?
>>>
>>> (and I will have follow up questions depending on the answer I suspect
>>> ;))
>>>
>>> Thanks,
>>> --
>>> Christophe
>>>
>>
>>


Re: State management and heap usage

2018-04-12 Thread Gary Yao
Hi Michael,

You can configure the default state backend by setting state.backend in
flink-conf.yaml, or you can configure it per job [1]. The default state
backend
is "jobmanager" (MemoryStateBackend), which stores state and checkpoints on
the
Java heap. RocksDB must be explicitly enabled, e.g., by setting
state.backend to
"rocksdb".

Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/state_backends.html#configuring-a-state-backend

On Wed, Apr 11, 2018 at 11:04 PM, TechnoMage  wrote:

> I am pretty new to flink and have an initial streaming job working both
> locally and remotely.  But, both ways if the data volume is too high it
> runs out of heap.  I am using RichMapFunction to process multiple streams
> of data.  I assumed Flink would manage keeping state in ram when possible,
> and spill to RocksDB when it exceeded heap.
>
> Is this correct?  If so are there configs I need to set to enable or tune
> this so it can run within a fixed memory size?
>
> Michael


Re: keyBy and parallelism

2018-04-12 Thread 周思华
Hi Christophe,
I think what you want to do is "stream join", and I'm a bit confuse that if you 
have know there are only 8 keys   then why would you still like to use 16 
parallelisms? 8 of them will be idle(a waste of CPU). In the KeyedStream, the 
tuples with the same key will be sent to the same parrallelism. 


And I'm also a bit confuse about the pseudo code, it looks like you regard that 
the tuple with the same key in stream A will always arrive before the tuple in 
stream B? I think that can't be promised... you may need to store the tuple in 
stream B in case that tuple in stream B arrive before A, and do the "analysis 
logic" in both flatMap1() and flatMap2().


Regards,
Sihua Zhou


On 04/12/2018 15:44,Christophe Jolif wrote:
Thanks Chesnay (and others).


That's what I was figuring out. Now let's go onto the follow up with my exact 
use-case.


I have two streams A and B. A basically receives "rules" that the processing of 
B should observe to process.


There is a "key" that allows me to know that a rule x coming in A is for events 
with the same key coming in B.


I was planning to do (pseudo code):


A.connect(B).keyBy("thekey").flatMap(
   flatMap1()
  -> store in a ValueState the rule 
   flatMap2()
  -> use the state to get the rule, transform the element according to the 
rule, collect it
)




I think it should work, right, because the ValueState will be "per key" and 
contain the rule for this key and so on?


Now, what I really care is not having all the elements of key1 in the same 
parallelism, I just want to make sure key1 and key2 are isolated so I can use 
the key state to store the corresponding rule and key2 rules are not used for 
key1 and conversely.


So ideally instead of using 8 parallelisms, in order to use the full power of 
my system, even with 8 keys I would like to use 16 parallelisms as I don't care 
about all elements of key1 being in the same parallelism. All I care is that 
the state contain the rule corresponding to this key.


What would be the recommended approach here?


Thanks again for your help,
--
Christophe




On Thu, Apr 12, 2018 at 9:31 AM, Chesnay Schepler  wrote:

You will get 16 parallel executions since you specify a parallellism of 16, 
however 8 of these will not get any data.


On 11.04.2018 23:29, Hao Sun wrote:

From what I learnt, you have to control parallelism your self. You can set 
parallelism on operator or set default one through flink-config.yaml.
I might be wrong.


On Wed, Apr 11, 2018 at 2:16 PM Christophe Jolif  wrote:

Hi all,


Imagine I have a default parallelism of 16 and I do something like


stream.keyBy("something").flatMap()


Now let's imagine I have less than 16 keys, maybe 8.


How many parallel executions of the flatMap function will I get? 8 because I 
have 8 keys, or 16 because I have default parallelism at 16?


(and I will have follow up questions depending on the answer I suspect ;))


Thanks,
--

Christophe




Re: keyBy and parallelism

2018-04-12 Thread Christophe Jolif
Thanks Chesnay (and others).

That's what I was figuring out. Now let's go onto the follow up with my
exact use-case.

I have two streams A and B. A basically receives "rules" that the
processing of B should observe to process.

There is a "key" that allows me to know that a rule x coming in A is for
events with the same key coming in B.

I was planning to do (pseudo code):

A.connect(B).keyBy("thekey").flatMap(
   flatMap1()
  -> store in a ValueState the rule
   flatMap2()
  -> use the state to get the rule, transform the element according to
the rule, collect it
)


I think it should work, right, because the ValueState will be "per key" and
contain the rule for this key and so on?

Now, what I really care is not having all the elements of key1 in the same
parallelism, I just want to make sure key1 and key2 are isolated so I can
use the key state to store the corresponding rule and key2 rules are not
used for key1 and conversely.

So ideally instead of using 8 parallelisms, in order to use the full power
of my system, even with 8 keys I would like to use 16 parallelisms as I
don't care about all elements of key1 being in the same parallelism. All I
care is that the state contain the rule corresponding to this key.

What would be the recommended approach here?

Thanks again for your help,
--
Christophe


On Thu, Apr 12, 2018 at 9:31 AM, Chesnay Schepler 
wrote:

> You will get 16 parallel executions since you specify a parallellism of
> 16, however 8 of these will not get any data.
>
>
> On 11.04.2018 23:29, Hao Sun wrote:
>
> From what I learnt, you have to control parallelism your self. You can set
> parallelism on operator or set default one through flink-config.yaml.
> I might be wrong.
>
> On Wed, Apr 11, 2018 at 2:16 PM Christophe Jolif  wrote:
>
>> Hi all,
>>
>> Imagine I have a default parallelism of 16 and I do something like
>>
>> stream.keyBy("something").flatMap()
>>
>> Now let's imagine I have less than 16 keys, maybe 8.
>>
>> How many parallel executions of the flatMap function will I get? 8
>> because I have 8 keys, or 16 because I have default parallelism at 16?
>>
>> (and I will have follow up questions depending on the answer I suspect ;))
>>
>> Thanks,
>> --
>> Christophe
>>
>
>


Re: Kafka consumer to sync topics by event time?

2018-04-12 Thread Juho Autio
Looks like the bug https://issues.apache.org/jira/browse/FLINK-5479 is
entirely preventing this feature to be used if there are any idle
partitions. It would be nice to mention in documentation that currently
this requires all subscribed partitions to have a constant stream of data
with growing timestamps. When watermark gets stalled on an idle partition
it blocks everything.

Link to current documentation:
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission

On Mon, Dec 4, 2017 at 4:29 PM, Fabian Hueske  wrote:

> You are right, offsets cannot be used for tracking processing progress. I
> think setting Kafka offsets with respect to some progress notion other than
> "has been consumed" would be highly application specific and hard to
> generalize.
> As you said, there might be a window (such as a session window) that is
> open much longer than all other windows and which would hold back the
> offset. Other applications might not use the built-in windows at all but
> custom ProcessFunctions.
>
> Have you considered tracking progress using watermarks?
>
> 2017-12-04 14:42 GMT+01:00 Juho Autio :
>
>> Thank you Fabian. Really clear explanation. That matches with my
>> observation indeed (data is not dropped from either small or big topic, but
>> the offsets are advancing in kafka side already before those offsets have
>> been triggered from a window operator).
>>
>> This means that it's a bit harder to meaningfully monitor the job's
>> progress solely based on kafka consumer offsets. Is there a reason why
>> Flink couldn't instead commit the offsets after they have been triggered
>> from downstream windows? I could imagine that this might pose a problem if
>> there are any windows that remain open for a very long time, but in general
>> it would be useful IMHO. Or Flink could even commit both (read vs.
>> triggered) offsets to kafka for monitoring purposes.
>>
>> On Mon, Dec 4, 2017 at 3:30 PM, Fabian Hueske  wrote:
>>
>>> Hi Juho,
>>>
>>> the partitions of both topics are independently consumed, i.e., at their
>>> own speed without coordination. With the configuration that Gordon linked,
>>> watermarks are generated per partition.
>>> Each source task maintains the latest (and highest) watermark per
>>> partition and propagates the smallest watermark. The same mechanism is
>>> applied for watermarks across tasks (this is what Kien referred to).
>>>
>>> In the case that you are describing, the partitions of the smaller topic
>>> are faster consumed (hence the offsets are faster aligned) but watermarks
>>> are emitted "at the speed" of the bigger topic.
>>> Therefore, the timestamps of records from the smaller topic can be much
>>> ahead of the watermark.
>>> In principle, that does not pose a problem. Stateful operators (such as
>>> windows) remember the "early" records and process them when they receive a
>>> watermark passes the timestamps of the early records.
>>>
>>> Regarding your question "Are they committed to Kafka before their
>>> watermark has passed on Flink's side?":
>>> The offsets of the smaller topic might be checkpointed when all
>>> partitions have been read to the "end" and the bigger topic is still
>>> catching up.
>>> The watermarks are moving at the speed of the bigger topic, but all
>>> "early" events of the smaller topic are stored in stateful operators and
>>> are checkpointed as well.
>>>
>>> So, you do not lose neither early nor late data.
>>>
>>> Best, Fabian
>>>
>>>
>>>
>>> 2017-12-01 13:43 GMT+01:00 Juho Autio :
>>>
 Thanks for the answers, I still don't understand why I can see the
 offsets being quickly committed to Kafka for the "small topic"? Are they
 committed to Kafka before their watermark has passed on Flink's side? That
 would be quite confusing.. Indeed when Flink handles the state/offsets
 internally, the consumer offsets are committed to Kafka just for reference.

 Otherwise, what you're saying sounds very good to me. The documentation
 just doesn't explicitly say anything about how it works across topics.

 On Kien's answer: "When you join multiple stream with different
 watermarks", note that I'm not joining any topics myself, I get them as a
 single stream from the Flink kafka consumer based on the list of topics
 that I asked for.

 Thanks,
 Juho

 On Wed, Nov 22, 2017 at 2:57 PM, Tzu-Li (Gordon) Tai <
 tzuli...@apache.org> wrote:

> Hi!
>
> The FlinkKafkaConsumer can handle watermark advancement with
> per-Kafka-partition awareness (across partitions of different topics).
> You can see an example of how to do that here [1].
>
> Basically what this does is that it generates watermarks within the
> Kafka
> consumer individually for each Kafka partition, and the per-partition
> 

Re: keyBy and parallelism

2018-04-12 Thread Chesnay Schepler
You will get 16 parallel executions since you specify a parallellism of 
16, however 8 of these will not get any data.


On 11.04.2018 23:29, Hao Sun wrote:
From what I learnt, you have to control parallelism your self. You can 
set parallelism on operator or set default one through flink-config.yaml.

I might be wrong.

On Wed, Apr 11, 2018 at 2:16 PM Christophe Jolif > wrote:


Hi all,

Imagine I have a default parallelism of 16 and I do something like

stream.keyBy("something").flatMap()

Now let's imagine I have less than 16 keys, maybe 8.

How many parallel executions of the flatMap function will I get? 8
because I have 8 keys, or 16 because I have default parallelism at 16?

(and I will have follow up questions depending on the answer I
suspect ;))

Thanks,
-- 
Christophe