Hi, I have 2 AsyncFunctions SampleCopyAsyncFunction and SampleSinkAsyncFunction called with AsyncDataStream.unorderedWait. The 1st AsyncDataStream.unorderedWait’s SampleCopyAsyncFunction .asyncInvoke gets called properly but the 2nd SampleSinkAsyncFunction.asyncInvoke never gets called(though open and close functions are called). Is there any way for me to have the 2nd asyncInvoke get called ? I have an Executors.newFixedThreadPool(..) that I use within each AsyncFunction.
TIA Here is the code: AsyncFunction<CameraWithCube, CameraWithCube> cameraWithCubeAsyncFunction = new SampleCopyAsyncFunction(shutdownWaitTS, inputFile, options, nThreads); DataStream<CameraWithCube> cameraWithCubeDataStreamAsync = AsyncDataStream.unorderedWait(keyedByCamCameraStream, cameraWithCubeAsyncFunction, timeout, TimeUnit.MILLISECONDS, nCapacity) .setParallelism(parallelCamTasks);//.startNewChain() DataStream<CameraWithCube> cameraWithCubeDataStream = cameraWithCubeDataStreamAsync.keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null ? cameraWithCube.cameraKey.getTs() : new Object()); String uuid = UUID.randomUUID().toString(); DataStream<Tuple2<InputMetadata, CameraWithCube>> enrichedCameraFeed = inputMetadataDataStream .connect(cameraWithCubeDataStream) .flatMap(new SyncLatchFunction(outputFile, outputPath, uuid)) .uid("connect2Streams") .setParallelism(1); AsyncFunction<Tuple2<InputMetadata, CameraWithCube>, Tuple2<InputMetadata, CameraWithCube>> cubeSinkAsyncFunction = new SampleSinkAsyncFunction(shutdownWaitTS, outputPath, options, nThreads, uuid); DataStream<Tuple2<InputMetadata, CameraWithCube>> enrichedCameraFeedSinkAsync = AsyncDataStream.unorderedWait(enrichedCameraFeed, cubeSinkAsyncFunction, timeout, TimeUnit.MILLISECONDS, nCapacity) .setParallelism(parallelCubeTasks) .uid("Read-Image-Async");//ç== asyncInvoke never gets called for 2nd AsyncFunction DataStream<Tuple2<InputMetadata, CameraWithCube>> enrichedCameraFeedSinkAsyncDataStream = enrichedCameraFeedSinkAsync.keyBy((tuple2) -> tuple2.f0.inputMetadataKey != null ? tuple2.f0.inputMetadataKey.getTs() : new Object()); //enrichedCameraFeedSinkAsyncDataStream.print();ç this doesn’t work enrichedCameraFeedSinkAsyncDataStream.addSink(new CubeProcessingSink(options, outputPath, uuid)) //, shutdownWaitTS .setParallelism(parallelCubeTasks) .uid("Cube-Sink");