I have some kind of a concurrency problem with Bucketing sink when I write
to S3.
I use the AvroKeyValueSinkWriter.
The problem is that when I send events the suppose to be written to the same
directory, but to a different part file (because of different event types),
the files override each other.
The problem occurs only when I sink the files to S3. 
When I write the files to the local storage it does not happen, but I think
that only because there's this loop in openNewPartFile:

// The following loop tries different partCounter values in ascending order
until it reaches the minimum
// that is not yet used. This works since there is only one parallel subtask
that tries names with this
// subtask id. Otherwise we would run into concurrency issues here. This is
aligned with the way we now
// clean the base directory in case of rescaling.

/int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" +
while (fs.exists(partPath) ||
  fs.exists(getPendingPathFor(partPath)) ||
  fs.exists(getInProgressPathFor(partPath))) {
        partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" +
that makes sense. But on S3 the files does not exist until checkpointing, so
the loop won't find the files.

After debugging, I've noticed that in the invoke method, in
state.getBucketState() the first time I try to write event to the bucket, it
creates a new bucketState in the HashMap, but the second time I try to write
to the same bucket (with the different event), it does find this new

Thanks for the help! 

Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to