Hi, This worked out after looking at https://stackoverflow.com/questions/44436401/some-puzzles-for-the-operator-parallelism-in-flink?rq=1
Why cannot I use setParallelism after keyBy-is it not an operator ? DataStream<CameraWithCube> cameraWithCubeDataStream = env .addSource(new CameraWithCubeSource(cameraFile, delay, servingSpeedFactor)) .keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null ? cameraWithCube.cameraKey.getCam() : new Object()) //.setParallelism(parallelTasks) //??? Why cannot I use setParallelism after keyBy-is it not an operator //.setMaxParallelism(parallelTasks) // https://stackoverflow.com/questions/44436401/some-puzzles-for-the-operator-parallelism-in-flink?rq=1 .process(new ProcessFunction<CameraWithCube, CameraWithCube>() { @Override public void processElement(CameraWithCube cameraWithCube, Context context, Collector<CameraWithCube> collector) throws Exception { logger.info("before thread sleep"); Thread.sleep(500); logger.info("after thread sleep"); collector.collect(cameraWithCube); } }) .setParallelism(parallelTasks) //???do I need to set this or will it take the parallelism from the earlier step ? .setMaxParallelism(parallelTasks) .keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null ? cameraWithCube.cameraKey.getTs() : new Object()); TIA, Vijay On Wed, May 16, 2018 at 1:41 PM Jörn Franke <jornfra...@gmail.com> wrote: > Just some advice - do not use sleep to simulate a heavy task. Use real > data or generated data to simulate. This sleep is garbage from a software > quality point of view. Furthermore, it is often forgotten etc. > > On 16. May 2018, at 22:32, Vijay Balakrishnan <bvija...@gmail.com> wrote: > > Hi, > Newbie question - What I am trying to do is the following: > CameraWithCubeSource source sends data-containing tuples of (cameraNbr,TS). > 1. Need to partition data by cameraNbr. > *2. Then sleep for 1 sec to simulate a heavy process in the task.* > *3. Then need to partition data by TS and finally get the DataStream to > connect with another DataStream.* > > DataStream<CameraWithCube> cameraWithCubeDataStream = env > .addSource(new CameraWithCubeSource(cameraFile, delay, > servingSpeedFactor)) > .setParallelism(parallelTasks) > .setMaxParallelism(parallelTasks) > .keyBy((cameraWithCube) -> cameraWithCube.cameraKey != > null ? //partition by cameraNbr > cameraWithCube.cameraKey.getCam() : new Object()); > //sleep for 1 sec ???? how > *((KeyedStream) > cameraWithCubeDataStream).timeWindow(Time.seconds(1))* > * .apply(new WindowFunction<CameraWithCube, > CameraWithCube, String, TimeWindow>() {* > * @Override* > * public void apply(String cameraKeyCam, TimeWindow > timeWindow,* > * Iterable<CameraWithCube> > cameraWithCubesAssignedToWindow,* > * Collector<CameraWithCube> > collector) throws Exception {* > * Thread.sleep(1000);* > * > cameraWithCubesAssignedToWindow.forEach(cameraWithCube -> > collector.collect(cameraWithCube));* > > * }* > * })//returning void here from apply ??* > * //partition by TS and return DataStream* > * .keyBy((cameraWithCube) -> cameraWithCube.cameraKey != > null ? //partition by cameraNbr* > * cameraWithCube.cameraKey.getTS() : new > Object());* > ; > TIA, > Vijay > >