The problem is that your input and output path are the same. Because Flink
executes in a pipelined fashion, all the operators will come up at once.
When you set WriteMode.OVERWRITE for the sink, it will delete the path
before writing anything. That means that when your DataSource reads the
input, there will be nothing to read from. Thus you get an empty DataSet
which you write to HDFS again. Any further loops will then just write
nothing.

You can circumvent this problem, by prefixing every output file with a
counter that you increment in your loop. Alternatively, if you only want to
keep the latest output, you can use two files and let them alternate to be
input and output.

Let me know if you have any further questions.

Kind regards,
Max

On Thu, Jul 2, 2015 at 10:20 AM, Maximilian Michels <m...@apache.org> wrote:

> Hi Mihail,
>
> Thanks for the code. I'm trying to reproduce the problem now.
>
> On Wed, Jul 1, 2015 at 8:30 PM, Mihail Vieru <
> vi...@informatik.hu-berlin.de> wrote:
>
>>  Hi Max,
>>
>> thank you for your reply. I wanted to revise and dismiss all other
>> factors before writing back. I've attached you my code and sample input
>> data.
>>
>> I run the *APSPNaiveJob* using the following arguments:
>>
>> *0 100 hdfs://path/to/vertices-test-100 hdfs://path/to/edges-test-100
>> hdfs://path/to/tempgraph 10 0.5 hdfs://path/to/output-apsp 9*
>>
>> I was wrong, I originally thought that the first writeAsCsv call (line
>> 50) doesn't work. An exception is thrown without the WriteMode.OVERWRITE
>> when the file exists.
>>
>> But the problem lies with the second call (line 74), trying to write to
>> the same path on HDFS.
>>
>> This issue is blocking me, because I need to persist the vertices dataset
>> between iterations.
>>
>> Cheers,
>> Mihail
>>
>> P.S.: I'm using the latest 0.10-SNAPSHOT and HDFS 1.2.1.
>>
>>
>>
>> On 30.06.2015 16:51, Maximilian Michels wrote:
>>
>>   HI Mihail,
>>
>>  Thank you for your question. Do you have a short example that reproduces
>> the problem? It is hard to find the cause without an error message or some
>> example code.
>>
>>  I wonder how your loop works without WriteMode.OVERWRITE because it
>> should throw an exception in this case. Or do you change the file names on
>> every write?
>>
>>  Cheers,
>>  Max
>>
>> On Tue, Jun 30, 2015 at 3:47 PM, Mihail Vieru <
>> vi...@informatik.hu-berlin.de> wrote:
>>
>>>  I think my problem is related to a loop in my job.
>>>
>>> Before the loop, the writeAsCsv method works fine, even in overwrite
>>> mode.
>>>
>>> In the loop, in the first iteration, it writes an empty folder
>>> containing empty files to HDFS. Even though the DataSet it is supposed to
>>> write contains elements.
>>>
>>> Needless to say, this doesn't occur in a local execution environment,
>>> when writing to the local file system.
>>>
>>>
>>> I would appreciate any input on this.
>>>
>>> Best,
>>> Mihail
>>>
>>>
>>>
>>> On 30.06.2015 12:10, Mihail Vieru wrote:
>>>
>>> Hi Till,
>>>
>>> thank you for your reply.
>>>
>>> I have the following code snippet:
>>>
>>> *intermediateGraph.getVertices().writeAsCsv(tempGraphOutputPath, "\n",
>>> ";", WriteMode.OVERWRITE);*
>>>
>>> When I remove the WriteMode parameter, it works. So I can reason that
>>> the DataSet contains data elements.
>>>
>>> Cheers,
>>> Mihail
>>>
>>>
>>> On 30.06.2015 12:06, Till Rohrmann wrote:
>>>
>>>  Hi Mihail,
>>>
>>> have you checked that the DataSet you want to write to HDFS actually
>>> contains data elements? You can try calling collect which retrieves the
>>> data to your client to see what’s in there.
>>>
>>> Cheers,
>>> Till
>>> ​
>>>
>>> On Tue, Jun 30, 2015 at 12:01 PM, Mihail Vieru <
>>> vi...@informatik.hu-berlin.de> wrote:
>>>
>>>> Hi,
>>>>
>>>> the writeAsCsv method is not writing anything to HDFS (version 1.2.1)
>>>> when the WriteMode is set to OVERWRITE.
>>>> A file is created but it's empty. And no trace of errors in the Flink
>>>> or Hadoop logs on all nodes in the cluster.
>>>>
>>>> What could cause this issue? I really really need this feature..
>>>>
>>>> Best,
>>>> Mihail
>>>>
>>>
>>>
>>>
>>>
>>
>>
>

Reply via email to