Hi, Turns out the issue was with the RichParallelSourceFunction I was using was resulting in the Sink not getting called after the connect SyncLatchFunction. Need to figure out the issue there but the 2 asyncInvoke functions work fine now after I replaced the *ParallelCameraSource (* RichParallelSourceFunction) with the old* CheckpointedCameraWithCubeSource.*
DataStream<CameraWithCube> keyedByCamCameraStream = env .addSource(*new *CheckpointedCameraWithCubeSource(maxSeqCnt, servingSpeedMs, startTime, nbrCameras, outputFile), *"TileDB Camera"*) .uid(*"TileDB-Camera"*) .keyBy((cameraWithCube) -> cameraWithCube.*cameraKey *!= *null *? cameraWithCube.*cameraKey*.getCam() : *new *Object()) .process(*new *ProcessFunction<CameraWithCube, CameraWithCube>() { @Override *public void *processElement(CameraWithCube value, Context ctx, Collector<CameraWithCube> out) *throws *Exception { out.collect(value); } }) .setParallelism(parallelCamTasks); */*DataStream<CameraWithCube> keyedByCamCameraStream = env .addSource(new ParallelCameraSource(maxSeqCnt, servingSpeedMs, startTime, nbrCameras, outputFile), "TileDB Camera") .uid("TileDB-Camera") .setParallelism(parallelCamTasks) .partitionCustom((Partitioner<Integer>) (key, numPartitions) -> { return key % numPartitions; }, new KeySelector<CameraWithCube, Integer>() { @Override public Integer getKey(CameraWithCube cameraWithCube) throws Exception { ....; } });*/* Vijay On Thu, Jul 26, 2018 at 10:39 PM Vijay Balakrishnan <bvija...@gmail.com> wrote: > Hi, > > I have 2 AsyncFunctions SampleCopyAsyncFunction and > SampleSinkAsyncFunction called with AsyncDataStream.unorderedWait. The 1st > AsyncDataStream.unorderedWait’s SampleCopyAsyncFunction .asyncInvoke > gets called properly but the 2nd SampleSinkAsyncFunction.asyncInvoke never > gets called(though open and close functions are called). Is there any way > for me to have the 2nd asyncInvoke get called ? I have an > Executors.newFixedThreadPool(..) > that I use within each AsyncFunction. > > > > > TIA > > > > > > Here is the code: > > > > AsyncFunction<CameraWithCube, CameraWithCube> cameraWithCubeAsyncFunction = > > new SampleCopyAsyncFunction(shutdownWaitTS, inputFile, > options, nThreads); > > DataStream<CameraWithCube> cameraWithCubeDataStreamAsync = > > AsyncDataStream.unorderedWait(keyedByCamCameraStream, > cameraWithCubeAsyncFunction, timeout, TimeUnit.MILLISECONDS, nCapacity) > > > .setParallelism(parallelCamTasks);//.startNewChain() > > DataStream<CameraWithCube> cameraWithCubeDataStream = > cameraWithCubeDataStreamAsync.keyBy((cameraWithCube) -> > cameraWithCube.cameraKey != null ? > > cameraWithCube.cameraKey.getTs() : new Object()); > > String uuid = UUID.randomUUID().toString(); > > DataStream<Tuple2<InputMetadata, CameraWithCube>> > enrichedCameraFeed = inputMetadataDataStream > > .connect(cameraWithCubeDataStream) > > .flatMap(new SyncLatchFunction(outputFile, outputPath, > uuid)) > > .uid("connect2Streams") > > .setParallelism(1); > > AsyncFunction<Tuple2<InputMetadata, CameraWithCube>, > Tuple2<InputMetadata, CameraWithCube>> cubeSinkAsyncFunction = > > new SampleSinkAsyncFunction(shutdownWaitTS, outputPath, > options, nThreads, uuid); > > DataStream<Tuple2<InputMetadata, CameraWithCube>> > enrichedCameraFeedSinkAsync = > > AsyncDataStream.unorderedWait(enrichedCameraFeed, > cubeSinkAsyncFunction, timeout, TimeUnit.MILLISECONDS, nCapacity) > > .setParallelism(parallelCubeTasks) > > .uid("Read-Image-Async");//ç== asyncInvoke never > gets called for 2nd AsyncFunction > > DataStream<Tuple2<InputMetadata, CameraWithCube>> > enrichedCameraFeedSinkAsyncDataStream = > enrichedCameraFeedSinkAsync.keyBy((tuple2) -> tuple2.f0.inputMetadataKey != > null ? > > tuple2.f0.inputMetadataKey.getTs() : new Object()); > > //enrichedCameraFeedSinkAsyncDataStream.print();ç this doesn’t > work > > enrichedCameraFeedSinkAsyncDataStream.addSink(new > CubeProcessingSink(options, outputPath, uuid)) //, shutdownWaitTS > > .setParallelism(parallelCubeTasks) > > .uid("Cube-Sink"); >