Hello, I am working on a ParquetSink writer, which will convert a kafka stream to parquet format. I am having some weird issues in deploying this application to a yarn cluster. I am not 100% sure this falls into a flink related error, but I wanted to reach out to folks here incase it might be.
If I launch Flink within YARN only for executing a single job, it runs ok. This is the command I use for the deployment: *Command:* *flink run --jobmanager yarn-cluster -ytm 4096 -yjm 1048 -ys 2 -yn 2 -d -c <class_name> jar_name.jar* However as soon as I try to submit a similar job to a already running yarn cluster, I start to get these errors(*https://gist.github.com/neoeahit/f0130e9f447ea9c2baa38bf5ee4e6a57 <https://gist.github.com/neoeahit/f0130e9f447ea9c2baa38bf5ee4e6a57>*) and application crashes. I checked the location in /tmp, where I am creating the file, and there is no file existing there. *Command:* *flink run -yid application_id -d -c <class_name> jar_name.jar * A bit more about my algorithm, I use a temp array to buffer messages in the @invoke method, and when specific threshold are reached I create a parquet file with this buffered data. Once a tmp parquet file is created, I upload this file to long term storage. The code to write buffered data to a parquet file is: writer = Some(AvroParquetWriter.builder(getPendingFilePath(tmp_filename.get)) .withSchema(schema.get) .withCompressionCodec(compressionCodecName) .withRowGroupSize(blockSize) .withPageSize(pageSize) .build()) bufferedMessages.foreach { e => writer.get.write(e.payload) } writer.get.close() Please do let me know. Thanking in advance, - Vipul