Hi all,
Trying to have a s3 parquet bulk writer with file roll policy based on size
limitation + checkpoint. For that I’ve extended the CheckpointRollingPolicy and
overwritten shouldRollOnEvent to return true if the part size is greater than
the limit.
The problem is that the part size that I get from PartFileInfo.getSize() is
always 4. It never changes.
Is that a misconfiguration somewhere or that’s not supported for s3 parquet
files?
@Slf4j
public class FileSizeAndOnCheckpointRollingPolicy extends
CheckpointRollingPolicy<CloudEventAvro, String> {
private final long rollingFileSize;
public FileSizeAndOnCheckpointRollingPolicy (long rollingFileSize) {
this.rollingFileSize = rollingFileSize;
}
@Override
public boolean shouldRollOnEvent (PartFileInfo<String> partFileState,
CloudEventAvro element)
throws IOException {
log.info ("Part size: {}, rolling file size: {}", partFileState.getSize
(), rollingFileSize);
return partFileState.getSize () > rollingFileSize;
}
@Override
public boolean shouldRollOnProcessingTime (PartFileInfo<String>
partFileState, long currentTime) {
return false;
}
}