Hi,
This worked out after looking at
https://stackoverflow.com/questions/44436401/some-puzzles-for-the-operator-parallelism-in-flink?rq=1
Why cannot I use setParallelism after keyBy-is it not an operator ?
DataStream<CameraWithCube> cameraWithCubeDataStream = env
.addSource(new CameraWithCubeSource(cameraFile, delay,
servingSpeedFactor))
.keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null ?
cameraWithCube.cameraKey.getCam() : new Object())
//.setParallelism(parallelTasks) //??? Why cannot I use
setParallelism after keyBy-is it not an operator
//.setMaxParallelism(parallelTasks) //
https://stackoverflow.com/questions/44436401/some-puzzles-for-the-operator-parallelism-in-flink?rq=1
.process(new ProcessFunction<CameraWithCube, CameraWithCube>() {
@Override
public void processElement(CameraWithCube cameraWithCube,
Context context, Collector<CameraWithCube> collector) throws Exception
{
logger.info("before thread sleep");
Thread.sleep(500);
logger.info("after thread sleep");
collector.collect(cameraWithCube);
}
})
.setParallelism(parallelTasks) //???do I need to set this or
will it take the parallelism from the earlier step ?
.setMaxParallelism(parallelTasks)
.keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null ?
cameraWithCube.cameraKey.getTs() : new Object());
TIA,
Vijay
On Wed, May 16, 2018 at 1:41 PM Jörn Franke <[email protected]> wrote:
> Just some advice - do not use sleep to simulate a heavy task. Use real
> data or generated data to simulate. This sleep is garbage from a software
> quality point of view. Furthermore, it is often forgotten etc.
>
> On 16. May 2018, at 22:32, Vijay Balakrishnan <[email protected]> wrote:
>
> Hi,
> Newbie question - What I am trying to do is the following:
> CameraWithCubeSource source sends data-containing tuples of (cameraNbr,TS).
> 1. Need to partition data by cameraNbr.
> *2. Then sleep for 1 sec to simulate a heavy process in the task.*
> *3. Then need to partition data by TS and finally get the DataStream to
> connect with another DataStream.*
>
> DataStream<CameraWithCube> cameraWithCubeDataStream = env
> .addSource(new CameraWithCubeSource(cameraFile, delay,
> servingSpeedFactor))
> .setParallelism(parallelTasks)
> .setMaxParallelism(parallelTasks)
> .keyBy((cameraWithCube) -> cameraWithCube.cameraKey !=
> null ? //partition by cameraNbr
> cameraWithCube.cameraKey.getCam() : new Object());
> //sleep for 1 sec ???? how
> *((KeyedStream)
> cameraWithCubeDataStream).timeWindow(Time.seconds(1))*
> * .apply(new WindowFunction<CameraWithCube,
> CameraWithCube, String, TimeWindow>() {*
> * @Override*
> * public void apply(String cameraKeyCam, TimeWindow
> timeWindow,*
> * Iterable<CameraWithCube>
> cameraWithCubesAssignedToWindow,*
> * Collector<CameraWithCube>
> collector) throws Exception {*
> * Thread.sleep(1000);*
> *
> cameraWithCubesAssignedToWindow.forEach(cameraWithCube ->
> collector.collect(cameraWithCube));*
>
> * }*
> * })//returning void here from apply ??*
> * //partition by TS and return DataStream*
> * .keyBy((cameraWithCube) -> cameraWithCube.cameraKey !=
> null ? //partition by cameraNbr*
> * cameraWithCube.cameraKey.getTS() : new
> Object());*
> ;
> TIA,
> Vijay
>
>