Hi Niels,

If it’s only for simple data filtering that does not depend on the key, a 
simple “flatMap” or “filter" directly after the source can be chained to the 
source instances.
What that does is that the filter processing will be done within the same 
thread as the one fetching data from a Kafka partition, hence no excessive 
network transfers for this simple filtering.
You can read more about operator chaining here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/runtime.html#tasks-and-operator-chains

So, what that sums up to is that you have a FlinkKafkaConsumer as source, do a 
filter transformation right after, and then a keyBy followed with your 
heavy-processing, key-wise computations.
Does that makes sense for what you have in mind?

Cheers,
Gordon

On January 11, 2017 at 4:11:26 PM, Niels Basjes (ni...@basjes.nl) wrote:

Hi,

Ok. I think I get it.

WHAT IF:
Assume we create a addKeyedSource(...) which will allow us to add a source that 
makes some guarantees about the data.
And assume this source returns simply the Kafka partition id as the result of 
this 'hash' function.
Then if I have 10 kafka partitions I would read these records in and I could 
filter the data more efficiently because the data would not need to go over the 
network before this filter.
Afterwards I can scale it up to 'many' tasks for the heavier processing that 
follows.

As a concept: Could that be made to work?

Niels 

On Mon, Jan 9, 2017 at 9:14 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Hi Niels,

Thank you for bringing this up. I recall there was some previous discussion 
related to this before: [1].

I don’t think this is possible at the moment, mainly because of how the API is 
designed.

On the other hand, a KeyedStream in Flink is basically just a DataStream with a 
hash partitioner that is used when deciding which instance of the following 
downstream operator an emitted record of the stream is sent to.
So, even if we have a Kafka source that directly produces a KeyedStream on 
“addSource”, redistribution of data can still happen. I.e., if the parallelism 
of the compute operators right after is different than the number of Kafka 
partitions, redistribution will happen to let the key space and state be evenly 
distributed in Flink.

This leads to the argument that we probably need to think about whether 
retaining the original partitioning of records in Kafka when consumed by Flink 
is actually only a special case.
Flink, as a parallel compute engine, can freely adjust the parallelism of its 
operators regardless of the parallelism of Kafka topics (rescaling isn’t 
actually in yet, but is on the near-future roadmap).

So, under the general case, the parallelism of a Flink operator may be 
different than the number of Kafka partitions, and therefore redistributing 
must occur.
For redistribution to not need to take place right after an already partitioned 
Kafka topic, you’d need identical numbers of 1) Kafka partitions, 2) Flink 
source instances consuming the partitions, and 3) the parallelism of the keyed 
computation afterwards. This seems like a very specific situation, considering 
that you’ll be able to rescale Flink operators as the data’s key space / volume 
grows.

The main observation, I think, is that Flink itself maintains how the key space 
is partitioned within the system, which plays a crucial part in rescaling. 
That’s why by default it doesn’t respect existing partitioning of the key space 
in Kafka (or other external sources). Even if it initially does at the 
beginning of a job, partitioning will most likely change as you rescale your 
job / operators (which is a good thing, to be able to adapt).

Cheers,
Gordon

[1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/kafka-partition-assignment-td12123.html

On January 6, 2017 at 1:38:05 AM, Niels Basjes (ni...@basjes.nl) wrote:

Hi,

In my scenario I have click stream data that I persist in Kafka.
I use the sessionId as the key to instruct Kafka to put everything with the 
same sessionId into the same Kafka partition. That way I already have all 
events of a visitor in a single kafka partition in a fixed order.

When I read this data into Flink I get a generic data stream ontop of which I 
have to do a keyBy before my processing can continue. Such a keyBy will 
redistribute the data again to later tasks that can do the actual work.

Is it possible to create an adapted version of the Kafka source that 
immediately produces a keyed data stream?
 

--
Best regards / Met vriendelijke groeten,

Niels Basjes



--
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to