I think there is a problem with the interaction of legacy OutputFormats and
streaming programs. Flush is not called, the CsvOutputFormat only writes in
flush(), therefore we don't see any results.

On Mon, 2 May 2016 at 11:59 Fabian Hueske <fhue...@gmail.com> wrote:

> Have you checked the log files as well?
>
> 2016-05-01 14:07 GMT+02:00 subash basnet <yasub...@gmail.com>:
>
>> Hello there,
>>
>> If anyone could help me know why the below *result* DataStream get's
>> written as text, but not as csv?. As it's in a tuple format I guess it
>> should be the same for both text and csv. It shows no error just simply
>> doesn't write to file when result is written as csv.
>>
>> DataStream<Tuple2<String, Long>> *result* =
>> keyedEdits.timeWindow(Time.seconds(5)).fold(new Tuple2<>("", 0L),
>> new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
>> @Override
>> public Tuple2<String, Long> fold(Tuple2<String, Long> acc,
>> WikipediaEditEvent event) {
>> acc.f0 = event.getUser();
>> acc.f1 += event.getByteDiff();
>> return acc;
>> }
>> });
>>
>> *result.writeAsText(.....);
>> ----------------------------------------------------------------------> It
>> is working. **result.writeAsCsv(.....);
>> -----------------------------------------------------------------------> It
>> is not working. *
>>
>> Best Regards,
>> Subash Basnet
>>
>> On Wed, Apr 27, 2016 at 4:14 PM, subash basnet <yasub...@gmail.com>
>> wrote:
>>
>>> Hello all,
>>>
>>> I am able to write the Wikipedia edit data to the kafka and as a text
>>> file as per the given example of WikipediaAnalysis. But when I try to write
>>> it as csv, the blank files initially created never gets filled with data.
>>> Below is the code:
>>>
>>> DataStream<Tuple2<String, Long>> result =
>>> keyedEdits.timeWindow(Time.seconds(5)).fold(new Tuple2<>("", 0L),
>>> new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
>>> @Override
>>> public Tuple2<String, Long> fold(Tuple2<String, Long> acc,
>>> WikipediaEditEvent event) {
>>> acc.f0 = event.getUser();
>>> acc.f1 += event.getByteDiff();
>>> return acc;
>>> }
>>> });
>>>
>>> *result.writeAsText("file:///home/softwares/flink-1.0.0/kmeans/streaming/result",
>>> FileSystem.WriteMode.OVERWRITE); *-------------------------> works
>>>
>>> *result.writeAsCsv("file:///home/softwares/flink-1.0.0/kmeans/streaming/result",
>>> FileSystem.WriteMode.OVERWRITE);* --------------------------> doesn't
>>> work
>>>
>>> Why is data getting written to file as text but not as csv?
>>>
>>> Best Regards,
>>> Subash Basnet
>>>
>>>
>>
>

Reply via email to