Hi all,

[Of course, right after hitting send I realized I could just do 
rides.getTransformation().setUid(“blah”), ditto for the fares stream. Might be 
something to add to the docs, or provide a .uid() method on KeyedStreams for 
syntactic sugar]

Just for grins, I disabled auto-generated UIDs for the taxi rides/fares state 
example in the online tutorial. 

            env.getConfig().disableAutoGeneratedUIDs();

I then added UIDs for all operators, sources & sinks. But I still get the 
following when calling env.getExecutionPlan() or env.execute():

java.lang.IllegalStateException: Auto generated UIDs have been disabled but no 
UID or hash has been assigned to operator Partition
        at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:297)
        at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformTwoInputTransform(StreamGraphGenerator.java:682)
        at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:252)
        at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1529)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionPlan(StreamExecutionEnvironment.java:1564)
        at com.citi.flink.RidesAndFaresTool.main(RidesAndFaresTool.java:63)

The simple workflow is:

        DataStream<TaxiRide> rides = env
                .addSource(new CheckpointedTaxiRideSource(ridesFile, 
servingSpeedFactor))
                .uid("source: taxi rides")
                .name("taxi rides")
                .filter((TaxiRide ride) -> ride.isStart)
                .uid("filter: only start rides")
                .name("only start rides")
                .keyBy((TaxiRide ride) -> ride.rideId);

        DataStream<TaxiFare> fares = env
                .addSource(new CheckpointedTaxiFareSource(faresFile, 
servingSpeedFactor))
                .uid("source: taxi fares")
                .name("taxi fares")
                .keyBy((TaxiFare fare) -> fare.rideId);

        DataStreamSink<Tuple2<TaxiRide, TaxiFare>> enriched = rides
                .connect(fares)
                .flatMap(new EnrichmentFunction())
                .uid("function: enrich rides with fares")
                .name("enrich rides with fares")
                .addSink(sink)
                .uid("sink: enriched taxi rides")
                .name("enriched taxi rides");

Internally the exception is thrown when the EnrichFunction (a 
RichCoFlatMapFunction) is being transformed by 
StreamGraphGenerator.transformTwoInputTransform().

This calls StreamGraphGenerator.transform() with the two inputs, but the 
Transformation for each input is a PartitionTransformation.

I don’t see a way to set the UID following the keyBy(), as a KeyedStream 
creates the PartitionTransformation without a UID.

Any insight into setting the UID properly here? Or should 
StreamGraphGenerator.transform() skip the no-uid check for 
PartitionTransformation, since that’s not an operator with state?

Thanks,

— Ken

--------------------------
Ken Krugler
http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr

Reply via email to