I think there is a problem with the interaction of legacy OutputFormats and streaming programs. Flush is not called, the CsvOutputFormat only writes in flush(), therefore we don't see any results.
On Mon, 2 May 2016 at 11:59 Fabian Hueske <fhue...@gmail.com> wrote: > Have you checked the log files as well? > > 2016-05-01 14:07 GMT+02:00 subash basnet <yasub...@gmail.com>: > >> Hello there, >> >> If anyone could help me know why the below *result* DataStream get's >> written as text, but not as csv?. As it's in a tuple format I guess it >> should be the same for both text and csv. It shows no error just simply >> doesn't write to file when result is written as csv. >> >> DataStream<Tuple2<String, Long>> *result* = >> keyedEdits.timeWindow(Time.seconds(5)).fold(new Tuple2<>("", 0L), >> new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() { >> @Override >> public Tuple2<String, Long> fold(Tuple2<String, Long> acc, >> WikipediaEditEvent event) { >> acc.f0 = event.getUser(); >> acc.f1 += event.getByteDiff(); >> return acc; >> } >> }); >> >> *result.writeAsText(.....); >> ----------------------------------------------------------------------> It >> is working. **result.writeAsCsv(.....); >> -----------------------------------------------------------------------> It >> is not working. * >> >> Best Regards, >> Subash Basnet >> >> On Wed, Apr 27, 2016 at 4:14 PM, subash basnet <yasub...@gmail.com> >> wrote: >> >>> Hello all, >>> >>> I am able to write the Wikipedia edit data to the kafka and as a text >>> file as per the given example of WikipediaAnalysis. But when I try to write >>> it as csv, the blank files initially created never gets filled with data. >>> Below is the code: >>> >>> DataStream<Tuple2<String, Long>> result = >>> keyedEdits.timeWindow(Time.seconds(5)).fold(new Tuple2<>("", 0L), >>> new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() { >>> @Override >>> public Tuple2<String, Long> fold(Tuple2<String, Long> acc, >>> WikipediaEditEvent event) { >>> acc.f0 = event.getUser(); >>> acc.f1 += event.getByteDiff(); >>> return acc; >>> } >>> }); >>> >>> *result.writeAsText("file:///home/softwares/flink-1.0.0/kmeans/streaming/result", >>> FileSystem.WriteMode.OVERWRITE); *-------------------------> works >>> >>> *result.writeAsCsv("file:///home/softwares/flink-1.0.0/kmeans/streaming/result", >>> FileSystem.WriteMode.OVERWRITE);* --------------------------> doesn't >>> work >>> >>> Why is data getting written to file as text but not as csv? >>> >>> Best Regards, >>> Subash Basnet >>> >>> >> >