This isn't related to FLink but i might be able to help you out anyway.
Does the ParquestFileWriter set the 'overwrite' flag when calling
'FileSystem#create()'?
My suspicion is that you create a file for the first batch, write it
out, but not delete it.
For the next batch, the file cannot be created (since it still exists)
and thus fails.
Since the application now crashes the /tmp directory probably gets
cleaned up, which is why you don't see
any leftover file.
To verify this theory you can add a simple counter to your sink for the
number of created files. It should succeed
for the first batch and fail on the second one. In this case you should
make sure that the file is deleted after the first
batch has been written.
On 03.10.2017 08:01, vipul singh wrote:
Hello,
I am working on a ParquetSink writer, which will convert a kafka
stream to parquet format. I am having some weird issues in deploying
this application to a yarn cluster. I am not 100% sure this falls into
a flink related error, but I wanted to reach out to folks here incase
it might be.
If I launch Flink within YARN only for executing a single job, it runs
ok. This is the command I use for the deployment:
*Command:* /flink run--jobmanager yarn-cluster -ytm 4096 -yjm 1048 -ys
2 -yn 2 -d -c <class_name> jar_name.jar///
However as soon as I try to submit a similar job to a already running
yarn cluster, I start to get these
errors(_https://gist.github.com/neoeahit/f0130e9f447ea9c2baa38bf5ee4e6a57_)
and application crashes. I checked the location in /tmp, where I am
creating the file, and there is no file existing there.
*Command:* /flink run -yid application_id -d -c
<class_name> jar_name.jar /
A bit more about my algorithm, I use a temp array to buffer messages
in the @invoke method, and when specific threshold are reached I
create a parquet file with this buffered data. Once a tmp parquet file
is created, I upload this file to long term storage.
The code to write buffered data to a parquet file is:
writer =Some(AvroParquetWriter.builder(getPendingFilePath(tmp_filename.get))
.withSchema(schema.get)
.withCompressionCodec(compressionCodecName)
.withRowGroupSize(blockSize)
.withPageSize(pageSize)
.build())
bufferedMessages.foreach { e =>
writer.get.write(e.payload)
}
writer.get.close()
Please do let me know.
Thanking in advance,
- Vipul