Hello,

I have posted the same in stackoverflow but didnt get any response. So
posting it here for help.

https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787

Details:

I am working on a flink application on kubernetes(eks) which consumes data
from kafka and write it to s3.

We have around 120 million xml messages of size 4TB in kafka. Consuming
from kafka is super fast.

These are just string messages from kafka.

There is a high back pressure while writing to s3. We are not even hitting
the s3 PUT request limit which is arounf 3500 requests/sec. I am seeing
only 300 writes per minute to S3 which is very slow.

I am using StreamFileSink to write to s3 with Rolling policy as
OnCheckpointPolicy.

Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or s3p)

Other than this I dont have any config related to s3

    StreamingFileSink<Tuple3<String,String, String>> sink = StreamingFileSink
            .forRowFormat(new Path(s3://BUCKET),
                    (Tuple3<String,String, String> element,
OutputStream stream) -> {
                        PrintStream out = new PrintStream(stream);
                        out.println(element.f2);
                    })
            // Determine component type for each record
            .withBucketAssigner(new CustomBucketAssigner())
            .withRollingPolicy(OnCheckpointRollingPolicy.build())
            .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
            .build();

Is there anything that we can optimize on s3 from streamfilesink or in
flink-conf.xml ?

Like using bulkformat or any config params like fs.s3.maxThreads etc.

For checkpointing too I am using s3:// instead of s3p or s3a

env.setStateBackend((StateBackend) new
RocksDBStateBackend(s3://checkpoint_bucket, true));
        env.enableCheckpointing(300000);

Reply via email to