Hi,

Turns out the issue was with the RichParallelSourceFunction I was using was
resulting in the Sink not getting called after the connect
SyncLatchFunction. Need to figure out the issue there but the 2 asyncInvoke
functions work fine now after I replaced the *ParallelCameraSource (*
RichParallelSourceFunction) with the old* CheckpointedCameraWithCubeSource.*





DataStream<CameraWithCube> keyedByCamCameraStream = env
        .addSource(*new *CheckpointedCameraWithCubeSource(maxSeqCnt,
servingSpeedMs, startTime, nbrCameras, outputFile), *"TileDB Camera"*)
        .uid(*"TileDB-Camera"*)
        .keyBy((cameraWithCube) -> cameraWithCube.*cameraKey *!= *null *?
                cameraWithCube.*cameraKey*.getCam() : *new *Object())
        .process(*new *ProcessFunction<CameraWithCube, CameraWithCube>() {
            @Override
            *public void *processElement(CameraWithCube value, Context ctx,
Collector<CameraWithCube> out) *throws *Exception {
                out.collect(value);
            }
        })
        .setParallelism(parallelCamTasks);











*/*DataStream<CameraWithCube> keyedByCamCameraStream = env
.addSource(new ParallelCameraSource(maxSeqCnt, servingSpeedMs, startTime,
nbrCameras, outputFile), "TileDB Camera")
.uid("TileDB-Camera")        .setParallelism(parallelCamTasks)
.partitionCustom((Partitioner<Integer>) (key, numPartitions) ->
{            return key % numPartitions;        }, new
KeySelector<CameraWithCube, Integer>() {            @Override
public Integer getKey(CameraWithCube cameraWithCube) throws Exception {
           ....;            }        });*/*



Vijay

On Thu, Jul 26, 2018 at 10:39 PM Vijay Balakrishnan <bvija...@gmail.com>
wrote:

> Hi,
>
> I have 2 AsyncFunctions SampleCopyAsyncFunction and
> SampleSinkAsyncFunction called with AsyncDataStream.unorderedWait. The 1st
>  AsyncDataStream.unorderedWait’s SampleCopyAsyncFunction .asyncInvoke
> gets called properly but the 2nd SampleSinkAsyncFunction.asyncInvoke never
> gets called(though open and close functions are called). Is there any way
> for me to have the 2nd asyncInvoke get called ? I have an 
> Executors.newFixedThreadPool(..)
> that I use within each AsyncFunction.
>
>
>
>
> TIA
>
>
>
>
>
> Here is the code:
>
>
>
> AsyncFunction<CameraWithCube, CameraWithCube> cameraWithCubeAsyncFunction =
>
>                 new SampleCopyAsyncFunction(shutdownWaitTS, inputFile,
> options, nThreads);
>
>         DataStream<CameraWithCube> cameraWithCubeDataStreamAsync =
>
>                 AsyncDataStream.unorderedWait(keyedByCamCameraStream,
> cameraWithCubeAsyncFunction, timeout, TimeUnit.MILLISECONDS, nCapacity)
>
>
> .setParallelism(parallelCamTasks);//.startNewChain()
>
>         DataStream<CameraWithCube> cameraWithCubeDataStream =
> cameraWithCubeDataStreamAsync.keyBy((cameraWithCube) ->
> cameraWithCube.cameraKey != null ?
>
>                 cameraWithCube.cameraKey.getTs() : new Object());
>
>         String uuid = UUID.randomUUID().toString();
>
>         DataStream<Tuple2<InputMetadata, CameraWithCube>>
> enrichedCameraFeed = inputMetadataDataStream
>
>                 .connect(cameraWithCubeDataStream)
>
>                 .flatMap(new SyncLatchFunction(outputFile, outputPath,
> uuid))
>
>                 .uid("connect2Streams")
>
>                 .setParallelism(1);
>
>         AsyncFunction<Tuple2<InputMetadata, CameraWithCube>,
> Tuple2<InputMetadata, CameraWithCube>> cubeSinkAsyncFunction =
>
>                 new SampleSinkAsyncFunction(shutdownWaitTS, outputPath,
> options, nThreads, uuid);
>
>         DataStream<Tuple2<InputMetadata, CameraWithCube>>
> enrichedCameraFeedSinkAsync =
>
>                 AsyncDataStream.unorderedWait(enrichedCameraFeed,
> cubeSinkAsyncFunction, timeout, TimeUnit.MILLISECONDS, nCapacity)
>
>                         .setParallelism(parallelCubeTasks)
>
>                         .uid("Read-Image-Async");//ç== asyncInvoke never
> gets called for 2nd AsyncFunction
>
>         DataStream<Tuple2<InputMetadata, CameraWithCube>>
> enrichedCameraFeedSinkAsyncDataStream =
> enrichedCameraFeedSinkAsync.keyBy((tuple2) -> tuple2.f0.inputMetadataKey !=
> null ?
>
>                 tuple2.f0.inputMetadataKey.getTs() : new Object());
>
>         //enrichedCameraFeedSinkAsyncDataStream.print();ç this doesn’t
> work
>
>         enrichedCameraFeedSinkAsyncDataStream.addSink(new
> CubeProcessingSink(options, outputPath, uuid)) //, shutdownWaitTS
>
>                 .setParallelism(parallelCubeTasks)
>
>                 .uid("Cube-Sink");
>

Reply via email to