Hi,

I have 2 AsyncFunctions SampleCopyAsyncFunction and SampleSinkAsyncFunction
called with AsyncDataStream.unorderedWait. The 1st
AsyncDataStream.unorderedWait’s
SampleCopyAsyncFunction .asyncInvoke gets called properly but the 2nd
SampleSinkAsyncFunction.asyncInvoke never gets called(though open and close
functions are called). Is there any way for me to have the 2nd asyncInvoke
get called ? I have an Executors.newFixedThreadPool(..) that I use within
each AsyncFunction.




TIA





Here is the code:



AsyncFunction<CameraWithCube, CameraWithCube> cameraWithCubeAsyncFunction =

                new SampleCopyAsyncFunction(shutdownWaitTS, inputFile,
options, nThreads);

        DataStream<CameraWithCube> cameraWithCubeDataStreamAsync =

                AsyncDataStream.unorderedWait(keyedByCamCameraStream,
cameraWithCubeAsyncFunction, timeout, TimeUnit.MILLISECONDS, nCapacity)

                        .setParallelism(parallelCamTasks);//.startNewChain()

        DataStream<CameraWithCube> cameraWithCubeDataStream =
cameraWithCubeDataStreamAsync.keyBy((cameraWithCube) ->
cameraWithCube.cameraKey != null ?

                cameraWithCube.cameraKey.getTs() : new Object());

        String uuid = UUID.randomUUID().toString();

        DataStream<Tuple2<InputMetadata, CameraWithCube>>
enrichedCameraFeed = inputMetadataDataStream

                .connect(cameraWithCubeDataStream)

                .flatMap(new SyncLatchFunction(outputFile, outputPath,
uuid))

                .uid("connect2Streams")

                .setParallelism(1);

        AsyncFunction<Tuple2<InputMetadata, CameraWithCube>,
Tuple2<InputMetadata, CameraWithCube>> cubeSinkAsyncFunction =

                new SampleSinkAsyncFunction(shutdownWaitTS, outputPath,
options, nThreads, uuid);

        DataStream<Tuple2<InputMetadata, CameraWithCube>>
enrichedCameraFeedSinkAsync =

                AsyncDataStream.unorderedWait(enrichedCameraFeed,
cubeSinkAsyncFunction, timeout, TimeUnit.MILLISECONDS, nCapacity)

                        .setParallelism(parallelCubeTasks)

                        .uid("Read-Image-Async");//ç== asyncInvoke never
gets called for 2nd AsyncFunction

        DataStream<Tuple2<InputMetadata, CameraWithCube>>
enrichedCameraFeedSinkAsyncDataStream =
enrichedCameraFeedSinkAsync.keyBy((tuple2) -> tuple2.f0.inputMetadataKey !=
null ?

                tuple2.f0.inputMetadataKey.getTs() : new Object());

        //enrichedCameraFeedSinkAsyncDataStream.print();ç this doesn’t work

        enrichedCameraFeedSinkAsyncDataStream.addSink(new
CubeProcessingSink(options, outputPath, uuid)) //, shutdownWaitTS

                .setParallelism(parallelCubeTasks)

                .uid("Cube-Sink");

Reply via email to