Hi Rakkesh,

Did you call `execute()`on your `StreamExecutionEnvironment`?

Best,
Xingcan 

> On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <titus.rakk...@gmail.com> wrote:
> 
> Dear Friends,
>          I have 2 streams of the below data types.
> 
> DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;
> 
> DataStream<Tuple2<String, Double>> unionReloadsStream;
> 
> These streams are getting data from Kafka and getting data in different 
> frequencies. "unionReloadsStream"  will receive more data than 
> "splittedActivationTuple". I need to store  "splittedActivationTuple" in a 
> Window of 24 hours and manipulate its "Double" field, if a matching data 
> comes from unionReloadsStream (String field is the common field).
> 
> So I wrote the following method to do this task.
> 
> 
> public static DataStream<Tuple3<String, Integer, Double>> 
> joinActivationsBasedOnReload(
>             DataStream<Tuple3<String, Integer, Double>> activationsStream,
>             DataStream<Tuple2<String, Double>> unifiedReloadStream) {
>         
>         return activationsStream.join(unifiedReloadStream).where(new 
> ActivationStreamSelector())
>                 .equalTo(new 
> ReloadStreamSelector()).window(GlobalWindows.create())
>                 .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
>                 .apply(new JoinFunction<Tuple3<String, Integer, Double>, 
> Tuple2<String, Double>, Tuple3<String, Integer, Double>>() {
>                     private static final long serialVersionUID = 1L;
>                     @Override
>                     public Tuple3<String, Integer, Double> 
> join(Tuple3<String, Integer, Double> first,
>                             Tuple2<String, Double> second) {
>                         return new Tuple3<String, Integer, Double>(first.f0, 
> first.f1, first.f2 + second.f1);
>                     }
>                 });
>     }
> 
> 
> and calling as,
> 
> DataStream<Tuple3<String, Integer, Double>> activationWindowStream = 
> joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream);
>         
> activationWindowStream.print();
> 
> 
> But I couldn't see anything printing. 
> 
> I expected "activationWindowStream" to contain the "splittedActivationTuple" 
> (smaller set) data and the Double value accumulated if  unionReloadsStream's 
> incoming elements have a matching "String" field. But that is not happening. 
> Where I am missing?
> 
> Thanks,
> Rakkesh

Reply via email to