No problem. Make sure that your application didn’t run in the background three 
times, thus producing 3x the expected output.

Piotrek

> On Jun 19, 2017, at 5:25 PM, FRANCISCO BORJA ROBLES MARTIN 
> <francisco.robles.mar...@alumnos.upm.es> wrote:
> 
> Hello Piotrek!
> 
> Thanks for answering! Yes I have already changed the "TimeCharacteristic" to 
> "ProcessingTime". I need it for the ".setWriteTimestampToKafka(true)" option 
> as I use the timestamp in the Kafka consumer who reads this app's output. I 
> have already changed the code a bit for using KeyedStreams and be able to use 
> parallelism in the window/reduce functions.
> 
> About the problem, yesterday I noticed that the problem was growing as I did 
> more submits, it was doing x3 outputs (with small differences in each input 
> as you can see in my first message), but before it was doing x2 only. Finally 
> I stopped the cluster (stop-cluster.sh) and started it again 
> (start-cluster.sh) and the problem was solved. I have been trying to repeat 
> the problem submitting the app several times but I haven't achieved it today. 
> If it happens again I will try to repeat the problem with the smaller code as 
> possible to try to find where could be the possible bug (it seems to be 
> something wrong when submitting several times).
> 
> Kind regards!
> Fran.
> 
> 
> El 2017-06-19 14:43, Piotr Nowojski escribió:
>> Hi,
>> It is difficult for me to respond fully to your question. First of all
>> it would be really useful if you could strip down your example to a
>> minimal version that shows a problem. Unfortunately I was unable to
>> reproduce your issue. I was getting only one output line per window
>> (as expected). Could you try to print output to the console (or use
>> some different data sink) instead of writing it back to the Kafka,
>> maybe there is a problem? Also please try remove some parts of the
>> code bit by bit, so that you may be able to find what’s causing a
>> problem.
>> As a side note I have couple of concerns with your
>> timestamps/watermarks/windows definitions. First you specify time
>> characteristic to an EventTime:
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> But I don’t see where you are actually setting the
>> timestamp/watermarks. Didn’t you want to use
>> “.assignTimestampsAndWatermarks(…)” on your input DataStream
>> based on it’s content? Nevertheless, later you specify window by
>> ProcessingTime:
>>>          .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)));
>> Which defines the windows independent of the content of those events.
>> Maybe switching to properly EvenTime will solve your problem?
>> Thanks, Piotrek

Reply via email to