Fabian, All,
Along this same line, we have a datasource where we have parent key and child 
key. We need to first keyBy parent and then by child. If we want to have 
physical partitioning in a way where physical partiotioning happens first by 
parent key and localize grouping by child key, is there a need to using custom 
partitioner? Obviously we can keyBy twice but was wondering if we can minimize 
the re-partition stress.
Thanks,
Ashish


- Ashish

On Thursday, June 28, 2018, 9:02 AM, Fabian Hueske <fhue...@gmail.com> wrote:

Hi Vijay,
Flink does not provide fine-grained control to place keys to certain slots or 
machines. 
When specifying a key, it is up to Flink (i.e., its internal hash function) 
where the data is processed. This works well for large key spaces, but can be 
difficult if you have only a few keys.
So, even if you keyBy(cam) and handle the parallelization of seq# internally 
(which I would not recommend), it might still happen that the data of two 
cameras is processed on the same slot.The only way to change that would be to 
fiddle with the hash of your keys, but this might give you a completely 
different distribution when scaling out the application at a later point in 
time.
Best, Fabian

2018-06-26 19:54 GMT+02:00 Vijay Balakrishnan <bvija...@gmail.com>:

Hi Fabian,Thanks once again for your reply. I need to get the data from each 
cam/camera into 1 partition/slot and not move the gigantic video data around as 
much as I perform other operations on it. For eg, I can get seq#1 and seq#2 for 
cam1 in cam1 partition/slot and then combine, split,parse, stitch etc. 
operations on it in multiple threads within the same cam1 partition.
I have the CameraKey defined as cam,seq# and then keyBy(cam) to get it in 1 
partition(eg: cam1). The idea is to then work within the cam1 partition with 
various seq#'s 1,2 etc on various threads within the same slot/partition of 
TaskManager.
The data is stored in EFS keyed based on seq#/cam# folder structure.
Our actual problem is managing network bandwidth as a resource in each 
partition. We want to make sure that the processing of 1 camera(split into 
multiple seq# tasks) is not running on the same node as the processing of 
another camera as in that case, the required network bandwidth for storing the 
output of the process running in the partition would exceed the network 
bandwidth of the hardware. Camera processing is expected to run on the same 
hardware as the video decode step which is an earlier sequential process in the 
same Dataflow pipeline.
I guess I might have to use a ThreadPool within each Slot(cam partition) to 
work on each seq# ??
TIA
On Tue, Jun 26, 2018 at 1:06 AM Fabian Hueske <fhue...@gmail.com> wrote:

Hi,
keyBy() does not work hierarchically. Each keyBy() overrides the previous 
partitioning.You can keyBy(cam, seq#) which guarantees that all records with 
the same (cam, seq#) are processed by the same parallel instance.However, Flink 
does not give any guarantees about how the (cam, seq#) partitions are 
distributed across slots (or even physical nodes).
Btw. why is it important that all records of the same cam are processed by the 
same physical node?
Fabian

2018-06-25 21:36 GMT+02:00 Vijay Balakrishnan <bvija...@gmail.com>:

I see a .slotSharingGroup for SingleOutputStreamOperator   which can put 
parallel instances of operations in same TM slot.I also see a CoLocationGroup 
but do not see a .coLocationGroup for  SingleOutputStreamOperator to put a task 
on the same slot.Seems CoLocationGroup is defined at JobVertex level and has 
nothing to do with for  SingleOutputStreamOperator.TaskManager has many slots. 
Slots have many threads within it.I want to be able to put the cam1 
partition(keyBy(cam) in 1 slot and then use a keyBy(seq#) to run on many 
threads within that cam1 slot.
Vijay
On Mon, Jun 25, 2018 at 10:10 AM Vijay Balakrishnan <bvija...@gmail.com> wrote:

Thanks, Fabian.Been reading your excellent book on Flink Streaming.Can't wait 
for more chapters.Attached a pic.


I have records with seq# 1 and cam1 and cam2. I also have records with varying 
seq#'s.By partitioning on cam field first(keyBy(cam)), I can get cam1 partition 
on the same task manager instance/slot/vCore(???)Can I then have seq# 1 and 
seq# 2 for cam1 partition run in different slots/threads on the same Task 
Manager instance(aka cam1 partition) using keyBy(seq#) & setParallelism() ? Can 
forward Strategy be used to achieve this ?
TIA

On Mon, Jun 25, 2018 at 1:03 AM Fabian Hueske <fhue...@gmail.com> wrote:

Hi,
Flink distributes task instances to slots and does not expose physical 
machines.Records are partitioned to task instances by hash partitioning. It is 
also not possible to guarantee that the records in two different operators are 
send to the same slot.Sharing information by side-passing it (e.g., via a file 
on a machine or in a static object) is an anti-pattern and should be avoided.
Best, Fabian

2018-06-24 20:52 GMT+02:00 Vijay Balakrishnan <bvija...@gmail.com>:


Hi,

Need to partition by cameraWithCube.getCam() 1st using parallelCamTasks(passed 
in as args).

Then within each partition, need to partition again by cameraWithCube.getTs() 
but need to make sure each of the 2nd partition by getTS() runs on the same 
physical node ?

How do I achieve that ?
DataStream<CameraWithCube> cameraWithCubeDataStream = env
            .addSource(new Source(....))
            .keyBy((cameraWithCube) -> cameraWithCube.getCam() )
            .process(new ProcessFunction< CameraWithCube, CameraWithCube>() {
                public void processElement(CameraWithCube cameraWithCube, 
Context context, Collector<CameraWithCube> collector) throws Exception {
                    //do nothing
                }
            })
            .slotSharingGroup(" camSharingGroup")//TODO: how to add camera# of 
the partition
            .setParallelism( parallelCamTasks)
            .keyBy((cameraWithCube) -> cameraWithCube.getTs())
            .process(new ProcessFunction< CameraWithCube, CameraWithCube>() {
                public void processElement(CameraWithCube cameraWithCube, 
Context context, Collector<CameraWithCube> collector) throws Exception {
                    //TODO: process code
                }
            })
            .setParallelism( noOfSlotsInEachPhysicalNode)// TODO: how many 
parallel tasks within physical node
            .slotSharingGroup("??");// TODO: in same physical node

TIA












Reply via email to