Re: A "per operator instance" window all ?

2018-02-18 Thread 周思华
Hi Julien,
If I am not misunderstand, I think you can key your stream on a 
`Random.nextInt() % parallesm`, this way  you can "group" together alerts from 
different and benefit from multi parallems.




发自网易邮箱大师


On 02/19/2018 09:08,Xingcan Cui wrote:
Hi Julien,


sorry for my misunderstanding before. For now, the window can only be defined 
on a KeyedStream or an ordinary DataStream but with parallelism = 1. I’d like 
to provide three options for your scenario.


1. If your external data is static and can be fit into the memory, you can use 
ManagedStates to cache them without considering the querying problem.
2. Or you can use a CustomPartitioner to manually distribute your alert data 
and simulate an window operation by yourself in a ProcessFuncton.
3. You may also choose to use some external systems such as in-memory store, 
which can work as a cache for your queries.


Best,
Xingcan



On 19 Feb 2018, at 5:55 AM, Julien  wrote:


Hi Xingcan,

Thanks for your answer.
Yes, I understand that point:

if I have 100 resource IDs with parallelism of 4, then each operator instance 
will handle about 25 keys




The issue I have is that I want, on a given operator instance, to group those 
25 keys together in order to do only 1 query to an external system per operator 
instance:

on a given operator instance, I will do 1 query for my 25 keys
so with the 4 operator instances, I will do 4 query in parallel (with about 25 
keys per query)


I do not know how I can do that.

If I define a window on my keyed stream (with for example 
stream.key(_.resourceId).window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))),
 then my understanding is that the window is "associated" to the key. So in 
this case, on a given operator instance, I will have 25 of those windows (one 
per key), and I will do 25 queries (instead of 1).

Do you understand my point ?
Or maybe am I missing something ?

I'd like to find a way on operator instance 1 to group all the alerts received 
on those 25 resource ids and do 1 query for those 25 resource ids.
Same thing for operator instance 2, 3 and 4.


Thank you,
Regards.


On 18/02/2018 14:43, Xingcan Cui wrote:

Hi Julien,


the cardinality of your keys (e.g., resource ID) will not be restricted to the 
parallelism. For instance, if you have 100 resource IDs processed by 
KeyedStream with parallelism 4, each operator instance will handle about 25 
keys. 


Hope that helps.


Best,
Xingcan


On 18 Feb 2018, at 8:49 PM, Julien  wrote:



Hi,

I am pretty new to flink and I don't know what will be the best way to deal 
with the following use case:

as an input, I recieve some alerts from a kafka topic
an alert is linked to a network resource (like router-1, router-2, switch-1, 
switch-2, ...)
so an alert has two main information (the alert id and the resource id of the 
resource on which this alert has been raised)
then I need to do a query to an external system in order to enrich the alert 
with additional information on the resource

(A "natural" candidate for the key on this stream will be the resource id)

The issue I have is that regarding the query to the external system:

I do not want to do 1 query per resource id
I want to do a small number of queries in parallel (for example 4 queries in 
parallel every 500ms), each query requesting the external system for several 
alerts linked to several resource id
Currently, I don't know what will be the best way to deal with that:

I can key my stream on the resource id and then define a processing time window 
of 500ms and when the trigger is ok, then I do my query
by doing so, I will "group" several alerts in a single query, but they will all 
be linked to the same resource.
so I will do 1 query per resource id (which will be too much in my use case)
I can also do a windowAll on a non keyed stream
by doing so, I will "group" together alerts from different resource ids, but 
from what I've read in such a case the parallelism will always be one.
so in this case, I will only do 1 query whereas I'd like to have some 
parallelism

I am thinking that a way to deal with that will be:

define the resource id as the key of stream and put a parallelism of 4
and then having a way to do a windowAll on this keyed stream
which is that, on a given operator instance, I will "group" on the same window 
all the keys (ie all the resource ids) managed by this operator instance
with a parallelism of 4, I will do 4 queries in parallel (1 per operator 
instance, and each query will be for several alerts linked to several resource 
ids)

But after looking at the documentation, I cannot see this ability (having a 
windowAll on a keyed stream).

Am I missing something?

What will be the best way to deal with such a use case?




I've tried for example to review my key and to do something like 
"resourceId.hahsCode%" and then to use a time 
window.

In my example above, the  will be 4. And all 

Re: Need to understand the execution model of the Flink

2018-02-18 Thread Darshan Singh
Thanks for reply.

I guess I am not looking for alternate. I am trying to understand what
flink does in this scenario and if 10 tasks ar egoing in parallel I am sure
they will be reading csv as there is no other way.

Thanks

On Mon, Feb 19, 2018 at 12:48 AM, Niclas Hedhman  wrote:

>
> Do you really need the large single table created in step 2?
>
> If not, what you typically do is that the Csv source first do the common
> transformations. Then depending on whether the 10 outputs have different
> processing paths or not, you either do a split() to do individual
> processing depending on some criteria, or you just have the sink put each
> record in separate tables.
> You have full control, at each step along the transformation path whether
> it can be parallelized or not, and if there are no sequential constraints
> on your model, then you can easily fill all cores on all hosts quite easily.
>
> Even if you need the step 2 table, I would still just treat that as a
> split(), a branch ending in a Sink that does the storage there. No need to
> read records from file over and over again, nor to store them first in step
> 2 table and read them out again.
>
> Don't ask *me* about what happens in failure scenarios... I have myself
> not figured that out yet.
>
> HTH
> Niclas
>
> On Mon, Feb 19, 2018 at 3:11 AM, Darshan Singh 
> wrote:
>
>> Hi I would like to understand the execution model.
>>
>> 1. I have a csv files which is say 10 GB.
>> 2. I created a table from this file.
>>
>> 3. Now I have created filtered tables on this say 10 of these.
>> 4. Now I created a writetosink for all these 10 filtered tables.
>>
>> Now my question is that are these 10 filetered tables be written in
>> parallel (suppose i have 40 cores and set up parallelism to say 40 as well.
>>
>> Next question I have is that the table which I created form the csv file
>> which is common wont be persisted by flink internally rather for all 10
>> filtered tables it will read csv files and then apply the filter and write
>> to sink.
>>
>> I think that for all 10 filtered tables it will read csv again and again
>> in this case it will be read 10 times.  Is my understanding correct or I am
>> missing something.
>>
>> What if I step 2 I change table to dataset and back?
>>
>> Thanks
>>
>
>
>
> --
> Niclas Hedhman, Software Developer
> http://polygene.apache.org - New Energy for Java
>


Re: A "per operator instance" window all ?

2018-02-18 Thread Xingcan Cui
Hi Julien,

sorry for my misunderstanding before. For now, the window can only be defined 
on a KeyedStream or an ordinary DataStream but with parallelism = 1. I’d like 
to provide three options for your scenario.

1. If your external data is static and can be fit into the memory, you can use 
ManagedStates 

 to cache them without considering the querying problem.
2. Or you can use a CustomPartitioner 

 to manually distribute your alert data and simulate an window operation by 
yourself in a ProcessFuncton.
3. You may also choose to use some external systems such as in-memory store, 
which can work as a cache for your queries.

Best,
Xingcan

> On 19 Feb 2018, at 5:55 AM, Julien  wrote:
> 
> Hi Xingcan,
> 
> Thanks for your answer.
> Yes, I understand that point:
> if I have 100 resource IDs with parallelism of 4, then each operator instance 
> will handle about 25 keys
> 
> The issue I have is that I want, on a given operator instance, to group those 
> 25 keys together in order to do only 1 query to an external system per 
> operator instance:
> 
> on a given operator instance, I will do 1 query for my 25 keys
> so with the 4 operator instances, I will do 4 query in parallel (with about 
> 25 keys per query)
> 
> I do not know how I can do that.
> 
> If I define a window on my keyed stream (with for example 
> stream.key(_.resourceId).window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))),
>  then my understanding is that the window is "associated" to the key. So in 
> this case, on a given operator instance, I will have 25 of those windows (one 
> per key), and I will do 25 queries (instead of 1).
> 
> Do you understand my point ?
> Or maybe am I missing something ?
> 
> I'd like to find a way on operator instance 1 to group all the alerts 
> received on those 25 resource ids and do 1 query for those 25 resource ids.
> Same thing for operator instance 2, 3 and 4.
> 
> 
> Thank you,
> Regards.
> 
> 
> On 18/02/2018 14:43, Xingcan Cui wrote:
>> Hi Julien,
>> 
>> the cardinality of your keys (e.g., resource ID) will not be restricted to 
>> the parallelism. For instance, if you have 100 resource IDs processed by 
>> KeyedStream with parallelism 4, each operator instance will handle about 25 
>> keys. 
>> 
>> Hope that helps.
>> 
>> Best,
>> Xingcan
>> 
>>> On 18 Feb 2018, at 8:49 PM, Julien >> > wrote:
>>> 
>>> Hi,
>>> 
>>> I am pretty new to flink and I don't know what will be the best way to deal 
>>> with the following use case:
>>> 
>>> as an input, I recieve some alerts from a kafka topic
>>> an alert is linked to a network resource (like router-1, router-2, 
>>> switch-1, switch-2, ...)
>>> so an alert has two main information (the alert id and the resource id of 
>>> the resource on which this alert has been raised)
>>> then I need to do a query to an external system in order to enrich the 
>>> alert with additional information on the resource
>>> 
>>> (A "natural" candidate for the key on this stream will be the resource id)
>>> 
>>> The issue I have is that regarding the query to the external system:
>>> I do not want to do 1 query per resource id
>>> I want to do a small number of queries in parallel (for example 4 queries 
>>> in parallel every 500ms), each query requesting the external system for 
>>> several alerts linked to several resource id
>>> Currently, I don't know what will be the best way to deal with that:
>>> I can key my stream on the resource id and then define a processing time 
>>> window of 500ms and when the trigger is ok, then I do my query
>>> by doing so, I will "group" several alerts in a single query, but they will 
>>> all be linked to the same resource.
>>> so I will do 1 query per resource id (which will be too much in my use case)
>>> I can also do a windowAll on a non keyed stream
>>> by doing so, I will "group" together alerts from different resource ids, 
>>> but from what I've read in such a case the parallelism will always be one.
>>> so in this case, I will only do 1 query whereas I'd like to have some 
>>> parallelism
>>> I am thinking that a way to deal with that will be:
>>> 
>>> define the resource id as the key of stream and put a parallelism of 4
>>> and then having a way to do a windowAll on this keyed stream
>>> which is that, on a given operator instance, I will "group" on the same 
>>> window all the keys (ie all the resource ids) managed by this operator 
>>> instance
>>> with a parallelism of 4, I will do 4 queries in parallel (1 per operator 
>>> instance, and each query will be for several alerts linked to several 
>>> resource ids)
>>> But after looking at the documentation, I cannot see this ability (having a 
>>> windowAll on a keyed stream).
>>> 
>>> 

Need to understand the execution model of the Flink

2018-02-18 Thread Darshan Singh
Hi I would like to understand the execution model.

1. I have a csv files which is say 10 GB.
2. I created a table from this file.

3. Now I have created filtered tables on this say 10 of these.
4. Now I created a writetosink for all these 10 filtered tables.

Now my question is that are these 10 filetered tables be written in
parallel (suppose i have 40 cores and set up parallelism to say 40 as well.

Next question I have is that the table which I created form the csv file
which is common wont be persisted by flink internally rather for all 10
filtered tables it will read csv files and then apply the filter and write
to sink.

I think that for all 10 filtered tables it will read csv again and again in
this case it will be read 10 times.  Is my understanding correct or I am
missing something.

What if I step 2 I change table to dataset and back?

Thanks


Re: Correlation between number of operators and Job manager memory requirements

2018-02-18 Thread Pawel Bartoszek
Hi,

You could definitely try to find formula for heap size, but isnt's it
easier just to try out different memory settings and see which works best
for you?

Thanks,
Pawel

17 lut 2018 12:26 "Shailesh Jain"  napisał(a):

Oops, hit send by mistake.

In the configuration section, it is mentioned that for "many operators"
heap size should be increased.

"JVM heap size (in megabytes) for the JobManager. You may have to increase
the heap size for the JobManager if you are running very large applications
(with many operators), or if you are keeping a long history of them."

Is there any recommendation on the heap space required when there are
around 200 CEP operators, and close 80 Filter operators?

Any other leads on calculating the expected heap space allocation to start
the job would be really helpful.

Thanks,
Shailesh



On Sat, Feb 17, 2018 at 5:53 PM, Shailesh Jain 
wrote:

> Hi,
>
> I have flink job with almost 300 operators, and every time I'm trying to
> submit the job, the cluster crashes with OutOfMemory exception.
>
> I have 1 job manager and 1 task manager with 2 GB heap space allocated to
> both.
>
> In the configuration section of the documentation
>
>
>
>


A "per operator instance" window all ?

2018-02-18 Thread Julien

Hi,

I am pretty new to flink and I don't know what will be the best way to 
deal with the following use case:


 * as an input, I recieve some alerts from a kafka topic
 o an alert is linked to a network resource (like router-1,
   router-2, switch-1, switch-2, ...)
 o so an alert has two main information (the alert id and the
   resource id of the resource on which this alert has been raised)
 * then I need to do a query to an external system in order to enrich
   the alert with additional information on the resource


(A "natural" candidate for the key on this stream will be the resource id)

The issue I have is that regarding the query to the external system:

 * I do not want to do 1 query per resource id
 * I want to do a small number of queries in parallel (for example 4
   queries in parallel every 500ms), each query requesting the external
   system for several alerts linked to several resource id

Currently, I don't know what will be the best way to deal with that:

 * I can key my stream on the resource id and then define a processing
   time window of 500ms and when the trigger is ok, then I do my query
 o by doing so, I will "group" several alerts in a single query,
   but they will all be linked to the same resource.
 o so I will do 1 query per resource id (which will be too much in
   my use case)
 * I can also do a windowAll on a non keyed stream
 o by doing so, I will "group" together alerts from different
   resource ids, but from what I've read in such a case the
   parallelism will always be one.
 o so in this case, I will only do 1 query whereas I'd like to have
   some parallelism

I am thinking that a way to deal with that will be:

 * define the resource id as the key of stream and put a parallelism of 4
 * and then having a way to do a windowAll on this keyed stream
 o which is that, on a given operator instance, I will "group" on
   the same window all the keys (ie all the resource ids) managed
   by this operator instance
 o with a parallelism of 4, I will do 4 queries in parallel (1 per
   operator instance, and each query will be for several alerts
   linked to several resource ids)

But after looking at the documentation, I cannot see this ability 
(having a windowAll on a keyed stream).


Am I missing something?

What will be the best way to deal with such a use case?


I've tried for example to review my key and to do something like 
"resourceId.hahsCode%" and then to use a 
time window.


In my example above, the  will be 4. And 
all my keys will be 0, 1, 2 or 3.


The issue with this approach is that due to the way the operatorIdx is 
computed based on the key, it does not distribute well my processing:


 * when this partitioning logic from the "KeyGroupRangeAssignment"
   class is applied
 o //**
     * Assigns the given key to a parallel operator index.
     *
     * @param key the key to assign
     * @param maxParallelism the maximum supported parallelism,
   aka the number of key-groups.
     * @param parallelism the current parallelism of the operator
     * @return the index of the parallel operator to which the
   given key should be routed.
     */
    public static int assignKeyToParallelOperator(Object key,
   int maxParallelism, int parallelism) {
        return computeOperatorIndexForKeyGroup(maxParallelism,
   parallelism, assignToKeyGroup(key, maxParallelism));
    }

    /**
     * Assigns the given key to a key-group index.
     *
     * @param key the key to assign
     * @param maxParallelism the maximum supported parallelism,
   aka the number of key-groups.
     * @return the key-group to which the given key is assigned
     */
    public static int assignToKeyGroup(Object key, int
   maxParallelism) {
        return computeKeyGroupForKeyHash(key.hashCode(),
   maxParallelism);
    }/
 o key 0, 1, 2 and 3 are only assigned to operator 2 and 3 (so 2
   over my 4 operators will not have anything to do)


So, what will be the best way to deal with that?


Thank you in advance for your support.

Regards.


Julien.




Re: Only a single message processed

2018-02-18 Thread Xingcan Cui
Hi Niclas,

About the second point you mentioned, was the processed message a random one or 
a fixed one? 

The default startup mode for FlinkKafkaConsumer is StartupMode.GROUP_OFFSETS, 
maybe you could try StartupMode.EARLIST while debugging. Also, before that, you 
may try fetching the messages with the Kafka console consumer tool to see 
whether they can be consumed completely.

Besides, I wonder if you could provide the code for you Flink pipeline. That’ll 
be helpful.

Best,
Xingcan



> On 18 Feb 2018, at 7:52 PM, Niclas Hedhman  wrote:
> 
> 
> So, the producer is run (at the moment) manually (command-line) one message 
> at a time.
> Kafka's tooling (different consumer group) shows that a message is added each 
> time.
> 
> Since my last post, I have also added a UUID as the key, and that didn't make 
> a difference, so you are likely correct about de-dup.
> 
> 
> There is only a single partition on the topic, so it shouldn't be a 
> partitioning issue.
> 
> I also noticed;
> 1. Sending a message while consumer topology is running, after the first 
> message, then that message will be processed after a restart.
> 
> 2. Sending many messages, while consumer is running, and then doing many 
> restarts will only process a single of those. No idea what happens to the 
> others.
> 
> I am utterly confused.
> 
> And digging in the internals are not for the faint-hearted, but the 
> kafka.poll() returns frequently with empty records.
> 
> Will continue debugging that tomorrow...
> 
> 
> Niclas
> 
> On Feb 18, 2018 18:50, "Fabian Hueske"  > wrote:
> Hi Niclas,
> 
> Flink's Kafka consumer should not apply any deduplication. AFAIK, such a 
> "feature" is not implemented.
> Do you produce into the topic that you want to read or is the data in the 
> topic static?
> If you do not produce in the topic while the consuming application is 
> running, this might be an issue with the start position of the consumer [1]. 
> 
> Best, Fabian
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>  
> 
> 
> 2018-02-18 8:14 GMT+01:00 Niclas Hedhman  >:
> Hi,
> I am pretty new to Flink, and I like what I see and have started to build my 
> first application using it.
> I must be missing something very fundamental. I have a FlinkKafkaConsumer011, 
> followed by a handful of filter, map and flatMap functions and terminated 
> with the standard CassandraSink. I have try..catch on all my own maps/filters 
> and the first message in the queue is processed after start-up, but any 
> additional messages are ignore, i.e. not reaching the first map(). Any 
> additional messages are swallowed (i.e. consumed but not forwarded).
> 
> I suspect that this is some type of de-duplication going on, since the (test) 
> producer of these messages. The producer provide different values on each, 
> but there is no "key" being passed to the KafkaProducer.
> 
> Is that required? And if so, why? Can I tell Flink or Flink's KafkaConsumer 
> to ingest all messages, and not try to de-duplicate them?
> 
> Thanks
> 
> --
> Niclas Hedhman, Software Developer
> http://zest.apache.org  - New Energy for Java
> 



Re: Only a single message processed

2018-02-18 Thread Niclas Hedhman
So, the producer is run (at the moment) manually (command-line) one message
at a time.
Kafka's tooling (different consumer group) shows that a message is added
each time.

Since my last post, I have also added a UUID as the key, and that didn't
make a difference, so you are likely correct about de-dup.


There is only a single partition on the topic, so it shouldn't be a
partitioning issue.

I also noticed;
1. Sending a message while consumer topology is running, after the first
message, then that message will be processed after a restart.

2. Sending many messages, while consumer is running, and then doing many
restarts will only process a single of those. No idea what happens to the
others.

I am utterly confused.

And digging in the internals are not for the faint-hearted, but the
kafka.poll() returns frequently with empty records.

Will continue debugging that tomorrow...


Niclas

On Feb 18, 2018 18:50, "Fabian Hueske"  wrote:

> Hi Niclas,
>
> Flink's Kafka consumer should not apply any deduplication. AFAIK, such a
> "feature" is not implemented.
> Do you produce into the topic that you want to read or is the data in the
> topic static?
> If you do not produce in the topic while the consuming application is
> running, this might be an issue with the start position of the consumer
> [1].
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/connectors/kafka.html#kafka-consumers-
> start-position-configuration
>
> 2018-02-18 8:14 GMT+01:00 Niclas Hedhman :
>
>> Hi,
>> I am pretty new to Flink, and I like what I see and have started to build
>> my first application using it.
>> I must be missing something very fundamental. I have a
>> FlinkKafkaConsumer011, followed by a handful of filter, map and flatMap
>> functions and terminated with the standard CassandraSink. I have try..catch
>> on all my own maps/filters and the first message in the queue is processed
>> after start-up, but any additional messages are ignore, i.e. not reaching
>> the first map(). Any additional messages are swallowed (i.e. consumed but
>> not forwarded).
>>
>> I suspect that this is some type of de-duplication going on, since the
>> (test) producer of these messages. The producer provide different values on
>> each, but there is no "key" being passed to the KafkaProducer.
>>
>> Is that required? And if so, why? Can I tell Flink or Flink's
>> KafkaConsumer to ingest all messages, and not try to de-duplicate them?
>>
>> Thanks
>>
>> --
>> Niclas Hedhman, Software Developer
>> http://zest.apache.org - New Energy for Java
>>
>
>


Re: Only a single message processed

2018-02-18 Thread Fabian Hueske
Hi Niclas,

Flink's Kafka consumer should not apply any deduplication. AFAIK, such a
"feature" is not implemented.
Do you produce into the topic that you want to read or is the data in the
topic static?
If you do not produce in the topic while the consuming application is
running, this might be an issue with the start position of the consumer
[1].

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration

2018-02-18 8:14 GMT+01:00 Niclas Hedhman :

> Hi,
> I am pretty new to Flink, and I like what I see and have started to build
> my first application using it.
> I must be missing something very fundamental. I have a
> FlinkKafkaConsumer011, followed by a handful of filter, map and flatMap
> functions and terminated with the standard CassandraSink. I have try..catch
> on all my own maps/filters and the first message in the queue is processed
> after start-up, but any additional messages are ignore, i.e. not reaching
> the first map(). Any additional messages are swallowed (i.e. consumed but
> not forwarded).
>
> I suspect that this is some type of de-duplication going on, since the
> (test) producer of these messages. The producer provide different values on
> each, but there is no "key" being passed to the KafkaProducer.
>
> Is that required? And if so, why? Can I tell Flink or Flink's
> KafkaConsumer to ingest all messages, and not try to de-duplicate them?
>
> Thanks
>
> --
> Niclas Hedhman, Software Developer
> http://zest.apache.org - New Energy for Java
>