Re: S3A AWSS3IOException from Flink's BucketingSink to S3

2018-12-09 Thread Flink Developer
Hi, is there any idea on what causes this and how it can be resolved? Thanks.

‐‐‐ Original Message ‐‐‐
On Wednesday, December 5, 2018 12:44 AM, Flink Developer 
 wrote:

> I have a Flink app with high parallelism (400) running in AWS EMR. It uses 
> Flink v1.5.2. It sources Kafka and sinks to S3 using BucketingSink (using 
> RocksDb backend for checkpointing). The destination is defined using "s3a://" 
> prefix. The Flink job is a streaming app which runs continuously. At any 
> given time, it's possible that each worker will write to a part file in S3. 
> This means all workers combined could potentially generate/write to 400 files 
> (due to 400 parallelism).
>
> After a few days, one of the workers will fail with the exception:
>
> org.apache.hadoop.fs.s3a.AWSS3IOException: 
> copyFile(bucket/2018-09-01/05/_file-10-1.gz.in-progress, 
> bucket/2018-09-01/05/_file-10-1.gz.pending): 
> com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal 
> error. Pelase try again. (Service: Amazon S3; Status Code: 200 InternalError; 
> Request ID: xx; S3 Extended Request ID: yyy
> at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java: 
> 178)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java: 
> 1803)
> at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerRename(S3AFileSystem.java:776)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:662)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:575)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)
>
> This seems to randomly occur when a new part file is created by the 
> BucketingSink. The odd thing is that this happens randomly but consistently 
> on separate job executions. When it occurs, it happens to 1 of the parallel 
> flink workers (not all). Also, when this occurs, the Flink job transitions 
> into a FAILING state, but the Flink job does not restart and resume/recover 
> from the last successful checkpoint.
>
> What is the cause for this and how can it be resolved? Also, how can the job 
> be configured to restart/recover from the last successful checkpoint instead 
> of staying in the FAILING state?

S3A AWSS3IOException from Flink's BucketingSink to S3

2018-12-05 Thread Flink Developer
I have a Flink app with high parallelism (400) running in AWS EMR. It uses 
Flink v1.5.2. It sources Kafka and sinks to S3 using BucketingSink (using 
RocksDb backend for checkpointing). The destination is defined using "s3a://" 
prefix. The Flink job is a streaming app which runs continuously. At any given 
time, it's possible that each worker will write to a part file in S3. This 
means all workers combined could potentially generate/write to 400 files (due 
to 400 parallelism).

After a few days, one of the workers will fail with the exception:

org.apache.hadoop.fs.s3a.AWSS3IOException: 
copyFile(bucket/2018-09-01/05/_file-10-1.gz.in-progress, 
bucket/2018-09-01/05/_file-10-1.gz.pending): 
com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal 
error. Pelase try again. (Service: Amazon S3; Status Code: 200 InternalError; 
Request ID: xx; S3 Extended Request ID: yyy

at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java: 178)

at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java: 1803)

at 
org.apache.hadoop.fs.s3a.S3AFileSystem.innerRename(S3AFileSystem.java:776)

at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:662)

at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:575)

at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514)

at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)

This seems to randomly occur when a new part file is created by the 
BucketingSink. The odd thing is that this happens randomly but consistently on 
separate job executions. When it occurs, it happens to 1 of the parallel flink 
workers (not all). Also, when this occurs, the Flink job transitions into a 
FAILING state, but the Flink job does not restart and resume/recover from the 
last successful checkpoint.

What is the cause for this and how can it be resolved? Also, how can the job be 
configured to restart/recover from the last successful checkpoint instead of 
staying in the FAILING state?

Re: Flink Exception - AmazonS3Exception and ExecutionGraph - Error in failover strategy

2018-12-04 Thread Flink Developer
When this happens, it appears that one of the workers fails but the rest of the 
workers continue to run. How would I be able to configure the app to be able to 
recover itself completely from the last successful checkpoint when this happens?

‐‐‐ Original Message ‐‐‐
On Monday, December 3, 2018 11:02 AM, Flink Developer 
 wrote:

> I have a Flink app on 1.5.2 which sources data from Kafka topic (400 
> partitions) and runs with 400 parallelism. The sink uses bucketing sink to S3 
> with rocks db. Checkpoint interval is 2 min and checkpoint timeout is 2 min. 
> Checkpoint size is a few mb. After execution for a few days, I see:
>
> Org.apache.flink.runtime.executiongraph.ExecutionGraph - Error in failover 
> strategy - falling back to global restart
> Java.lang.ClassCastException: 
> com.amazonaws.services.s3.model.AmazonS3Exception cannot be cast to 
> com.amazonaws.AmazonClientException
> At 
> org.apache.hadoop.fs.s3a.AWSClientIOException.getCause(AWSClientIOException.java:42)
> At org.apache.flink.util.SerializedThrowable
> At org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatus()
> At 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
> At akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>
> What causes the exception  and  why is the Flink job unable to recover? It 
> states failing back to globsl restart? How can this be configured to recover 
> properly? Is the checkloche interval/timeout too low? The Flink job's 
> configuration shows Restart with fixed delay (0ms) #2147483647 restart 
> attempts.

Flink Exception - AmazonS3Exception and ExecutionGraph - Error in failover strategy

2018-12-03 Thread Flink Developer
I have a Flink app on 1.5.2 which sources data from Kafka topic (400 
partitions) and runs with 400 parallelism. The sink uses bucketing sink to S3 
with rocks db. Checkpoint interval is 2 min and checkpoint timeout is 2 min. 
Checkpoint size is a few mb. After execution for a few days, I see:

Org.apache.flink.runtime.executiongraph.ExecutionGraph - Error in failover 
strategy - falling back to global restart
Java.lang.ClassCastException: com.amazonaws.services.s3.model.AmazonS3Exception 
cannot be cast to com.amazonaws.AmazonClientException
At 
org.apache.hadoop.fs.s3a.AWSClientIOException.getCause(AWSClientIOException.java:42)
At org.apache.flink.util.SerializedThrowable
At org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatus()
At 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
At akka.dispatch.Mailbox.exec(Mailbox.scala:234)

What causes the exception  and  why is the Flink job unable to recover? It 
states failing back to globsl restart? How can this be configured to recover 
properly? Is the checkloche interval/timeout too low? The Flink job's 
configuration shows Restart with fixed delay (0ms) #2147483647 restart attempts.

Re: 回复:Flink job failing due to "Container is running beyond physical memory limits" error.

2018-11-26 Thread Flink Developer
Also, after the Flink job has failed from the above error, the Flink job is 
unable to recover from previous checkpoint. Is this the expected behavior? How 
can the job be recovered successfully from this?

‐‐‐ Original Message ‐‐‐
On Monday, November 26, 2018 12:35 AM, Flink Developer 
 wrote:

> I am also experiencing this error message "Container is running beyond 
> physical memory limits". In my case, I am using Flink 1.5.2 with 10 task 
> managers, with 40 slots for each task manager. The memory assigned during 
> flink cluster creation is 1024MB per task manager. The checkpoint is using 
> RocksDb and the checkpoint size is very small (10MB).
>
> Is the simply solution to increase the Task Manager memory size? I will try 
> from 1024MB to 4096MB per task manager.
>
> ‐‐‐ Original Message ‐‐‐
> On Sunday, November 25, 2018 7:58 PM, zhijiang  
> wrote:
>
>> I think it is probably related with rockdb memory usage if you have not 
>> found OutOfMemory issue before.
>>
>> There already existed a jira ticket [1] for fixing this issue, and you can 
>> watch it for updates. :)
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-10884
>>
>> Best,
>> Zhijiang
>>
>>> --
>>> 发件人:Gagan Agrawal 
>>> 发送时间:2018年11月24日(星期六) 14:14
>>> 收件人:user 
>>> 主 题:Flink job failing due to "Container is running beyond physical memory 
>>> limits" error.
>>>
>>> Hi,
>>> I am running flink job on yarn where it ran fine so far (4-5 days) and have 
>>> now started failing with following errors.
>>>
>>> 2018-11-24 03:46:21,029 INFO  org.apache.flink.yarn.YarnResourceManager 
>>> - Closing TaskExecutor connection 
>>> container_1542008917197_0038_01_06 because: Container 
>>> [pid=18380,containerID=container_1542008917197_0038_01_06] is running 
>>> beyond physical memory limits. Current usage: 3.0 GB of 3 GB physical 
>>> memory used; 5.0 GB of 15 GB virtual memory used. Killing container.
>>>
>>> This is simple job where we are reading 2 Avro streams from Kafka and 
>>> applying some custom UDF after creating keyed stream from union on those 2 
>>> streams and writing back output to Kafka. Udf internally uses Map State 
>>> with RocksDB backend. Currently size of checkpoint is around 300 GB and we 
>>> are running this with 10 task manager with 3 GB memory each. I have also 
>>> set "containerized.heap-cutoff-ratio: 0.5" but still facing same issue. 
>>> Flink version is 1.6.2
>>>
>>> Here is the flink command
>>> ./bin/flink run -m yarn-cluster -yd -yn 10 -ytm 3072 -ys 4 job.jar
>>>
>>> I want to understand what are typical reasons for this issue? Also why 
>>> would flink consume more memory than allocated as JVM memory is fixed and 
>>> will not grow beyond max heap. Can this be something related to RocksDB 
>>> where it may be consuming memory outside heap and hence over using defined 
>>> limits? I didn't find this issue when checkpoint size was small (<50 GB). 
>>> But ever since we are now at 300GB size, this issue is coming frequently. I 
>>> can try increasing memory, but I am still interested in knowing what are 
>>> typical reasons for this error if Jvm heap memory can not grow beyond 
>>> defined limit.
>>>
>>> Gagan

Re: Flink Exception - assigned slot container was removed

2018-11-26 Thread Flink Developer
In addition, after the Flink job has failed from the above exception, the Flink 
job is unable to recover from previous checkpoint. Is this the expected 
behavior? How can the job be recovered successfully from this?

‐‐‐ Original Message ‐‐‐
On Monday, November 26, 2018 12:30 AM, Flink Developer 
 wrote:

> Thanks for the suggestion Qi. I tried increasing slot.idle.timeout to 360 
> but it seems to still have encountered the issue. Does this mean if a slot or 
> "flink worker" has not processed items for 1 hour, that it will be removed?
>
> Would any other flink configuration properties help for this?
>
> slot.request.timeout
> web.timeout
> heartbeat.interval
> heartbeat.timeout
>
> ‐‐‐ Original Message ‐‐‐
> On Sunday, November 25, 2018 6:56 PM, 罗齐  wrote:
>
>> Hi,
>>
>> It looks that some of your slots were freed during the job execution 
>> (possibly due to idle for too long). AFAIK the exception was thrown when a 
>> pending Slot request was removed. You can try increase the 
>> “Slot.idle.timeout” to mitigate this issue (default is 50000, try 360 or 
>> higher).
>>
>> Regards,
>> Qi
>>
>>> On Nov 26, 2018, at 7:36 AM, Flink Developer  
>>> wrote:
>>>
>>> Hi, I have a Flink application sourcing from a topic in Kafka (400 
>>> partitions) and sinking to S3 using bucketingsink and using RocksDb for 
>>> checkpointing every 2 mins. The Flink app runs with parallelism 400 so that 
>>> each worker handles a partition. This is using Flink 1.5.2. The Flink 
>>> cluster uses 10 task managers with 40 slots each.
>>>
>>> After running for a few days straight, it encounters a Flink exception:
>>> Org.apache.flink.util.FlinkException: The assigned slot 
>>> container_1234567_0003_01_09_1 was removed.
>>>
>>> This causes the Flink job to fail. It is odd to me. I am unsure what causes 
>>> this. Also, during this time, I see some checkpoints stating "checkpoint 
>>> was declined (tasks not ready)". At this point, the job is unable to 
>>> recover and fails. Does this happen if a slot or worker is not doing 
>>> processing for X amount of time? Would I need to increase the Flink config 
>>> properties for the following when creating the Flink cluster in yarn?
>>>
>>> Slot.idle.timeout
>>> Slot.request.timeout
>>> Web.timeout
>>> Heartbeat.interval
>>> Heartbeat.timeout
>>>
>>> Any help would be greatly appreciated.

Re: 回复:Flink job failing due to "Container is running beyond physical memory limits" error.

2018-11-26 Thread Flink Developer
I am also experiencing this error message "Container is running beyond physical 
memory limits". In my case, I am using Flink 1.5.2 with 10 task managers, with 
40 slots for each task manager. The memory assigned during flink cluster 
creation is 1024MB per task manager. The checkpoint is using RocksDb and the 
checkpoint size is very small (10MB).

Is the simply solution to increase the Task Manager memory size? I will try 
from 1024MB to 4096MB per task manager.

‐‐‐ Original Message ‐‐‐
On Sunday, November 25, 2018 7:58 PM, zhijiang  
wrote:

> I think it is probably related with rockdb memory usage if you have not found 
> OutOfMemory issue before.
>
> There already existed a jira ticket [1] for fixing this issue, and you can 
> watch it for updates. :)
>
> [1] https://issues.apache.org/jira/browse/FLINK-10884
>
> Best,
> Zhijiang
>
>> --
>> 发件人:Gagan Agrawal 
>> 发送时间:2018年11月24日(星期六) 14:14
>> 收件人:user 
>> 主 题:Flink job failing due to "Container is running beyond physical memory 
>> limits" error.
>>
>> Hi,
>> I am running flink job on yarn where it ran fine so far (4-5 days) and have 
>> now started failing with following errors.
>>
>> 2018-11-24 03:46:21,029 INFO  org.apache.flink.yarn.YarnResourceManager  
>>- Closing TaskExecutor connection 
>> container_1542008917197_0038_01_06 because: Container 
>> [pid=18380,containerID=container_1542008917197_0038_01_06] is running 
>> beyond physical memory limits. Current usage: 3.0 GB of 3 GB physical memory 
>> used; 5.0 GB of 15 GB virtual memory used. Killing container.
>>
>> This is simple job where we are reading 2 Avro streams from Kafka and 
>> applying some custom UDF after creating keyed stream from union on those 2 
>> streams and writing back output to Kafka. Udf internally uses Map State with 
>> RocksDB backend. Currently size of checkpoint is around 300 GB and we are 
>> running this with 10 task manager with 3 GB memory each. I have also set 
>> "containerized.heap-cutoff-ratio: 0.5" but still facing same issue. Flink 
>> version is 1.6.2
>>
>> Here is the flink command
>> ./bin/flink run -m yarn-cluster -yd -yn 10 -ytm 3072 -ys 4 job.jar
>>
>> I want to understand what are typical reasons for this issue? Also why would 
>> flink consume more memory than allocated as JVM memory is fixed and will not 
>> grow beyond max heap. Can this be something related to RocksDB where it may 
>> be consuming memory outside heap and hence over using defined limits? I 
>> didn't find this issue when checkpoint size was small (<50 GB). But ever 
>> since we are now at 300GB size, this issue is coming frequently. I can try 
>> increasing memory, but I am still interested in knowing what are typical 
>> reasons for this error if Jvm heap memory can not grow beyond defined limit.
>>
>> Gagan

Re: Flink Exception - assigned slot container was removed

2018-11-26 Thread Flink Developer
Thanks for the suggestion Qi. I tried increasing slot.idle.timeout to 360 
but it seems to still have encountered the issue. Does this mean if a slot or 
"flink worker" has not processed items for 1 hour, that it will be removed?

Would any other flink configuration properties help for this?

slot.request.timeout
web.timeout
heartbeat.interval
heartbeat.timeout

‐‐‐ Original Message ‐‐‐
On Sunday, November 25, 2018 6:56 PM, 罗齐  wrote:

> Hi,
>
> It looks that some of your slots were freed during the job execution 
> (possibly due to idle for too long). AFAIK the exception was thrown when a 
> pending Slot request was removed. You can try increase the 
> “Slot.idle.timeout” to mitigate this issue (default is 5, try 360 or 
> higher).
>
> Regards,
> Qi
>
>> On Nov 26, 2018, at 7:36 AM, Flink Developer  
>> wrote:
>>
>> Hi, I have a Flink application sourcing from a topic in Kafka (400 
>> partitions) and sinking to S3 using bucketingsink and using RocksDb for 
>> checkpointing every 2 mins. The Flink app runs with parallelism 400 so that 
>> each worker handles a partition. This is using Flink 1.5.2. The Flink 
>> cluster uses 10 task managers with 40 slots each.
>>
>> After running for a few days straight, it encounters a Flink exception:
>> Org.apache.flink.util.FlinkException: The assigned slot 
>> container_1234567_0003_01_09_1 was removed.
>>
>> This causes the Flink job to fail. It is odd to me. I am unsure what causes 
>> this. Also, during this time, I see some checkpoints stating "checkpoint was 
>> declined (tasks not ready)". At this point, the job is unable to recover and 
>> fails. Does this happen if a slot or worker is not doing processing for X 
>> amount of time? Would I need to increase the Flink config properties for the 
>> following when creating the Flink cluster in yarn?
>>
>> Slot.idle.timeout
>> Slot.request.timeout
>> Web.timeout
>> Heartbeat.interval
>> Heartbeat.timeout
>>
>> Any help would be greatly appreciated.

Flink Exception - assigned slot container was removed

2018-11-25 Thread Flink Developer
Hi, I have a Flink application sourcing from a topic in Kafka (400 partitions) 
and sinking to S3 using bucketingsink and using RocksDb for checkpointing every 
2 mins. The Flink app runs with parallelism 400 so that each worker handles a 
partition. This is using Flink 1.5.2. The Flink cluster uses 10 task managers 
with 40 slots each.

After running for a few days straight, it encounters a Flink exception:
Org.apache.flink.util.FlinkException: The assigned slot 
container_1234567_0003_01_09_1 was removed.

This causes the Flink job to fail. It is odd to me. I am unsure what causes 
this. Also, during this time, I see some checkpoints stating "checkpoint was 
declined (tasks not ready)". At this point, the job is unable to recover and 
fails. Does this happen if a slot or worker is not doing processing for X 
amount of time? Would I need to increase the Flink config properties for the 
following when creating the Flink cluster in yarn?

Slot.idle.timeout
Slot.request.timeout
Web.timeout
Heartbeat.interval
Heartbeat.timeout

Any help would be greatly appreciated.

Re: How to get Kafka record's timestamp using Flink's KeyedDeserializationSchema?

2018-11-16 Thread Flink Developer
Thanks. Is there an alternative way to obtain the Kafka record timestamps using 
FlinkKafkaConsumer?

‐‐‐ Original Message ‐‐‐
On Friday, November 16, 2018 9:59 AM, Andrey Zagrebin 
 wrote:

> Hi,
>
> I think this is still on-going effort. You can monitor the corresponding 
> issues [1] and [2].
>
> Best,
> Andrey
>
> [1] https://issues.apache.org/jira/browse/FLINK-8500
> [2] https://issues.apache.org/jira/browse/FLINK-8354
>
>> On 16 Nov 2018, at 09:41, Flink Developer  
>> wrote:
>>
>> Kafka timestamp

How to get Kafka record's timestamp using Flink's KeyedDeserializationSchema?

2018-11-16 Thread Flink Developer
Hi, I have a flink app which uses the FlinkKafkaConsumer.

I am interested in retrieving the Kafka timestamp for a given record/offset 
using the *KeyedDeserializationSchema* which provides topic, partition, offset 
and message.

How can the timestamp be obtained through this interface?

Thank you

How to use multiple sources with multiple sinks

2018-11-10 Thread Flink Developer
How can I configure 1 Flink Job (stream execution environment, parallelism set 
to 10) to have multiple kafka sources where each has its' own sink to s3.

For example, let's say the sources are:

- Kafka Topic A - Consumer (10 partitions)
- Kafka Topic B - Consumer (10 partitions)
- Kafka Topic C - Consumer (10 partitions)

And let's say the sinks are:

- BucketingSink to S3 in bucket: s3://kafka_topic_a/
- BucketingSink to S3 in bucket: s3://kafka_topic_b/
- BucketingSink to S3 in bucket: s3://kafka_topic_c/

And between source 1 to sink 1, I would like to perform unique processing. 
Between source 2 to sink 2, it should have unique processing and between source 
3 to sink 3, it should also have unique processing.

How can this be achieved? Is there an example?

Re: Flink Kafka to BucketingSink to S3 - S3Exception

2018-11-07 Thread Flink Developer
Thank you Addison and Ravi for the detailed info.

Hi Addison, it sounds like StreamingFileSink is promising and will be available 
in Flink 1.7. From the mailing list, it looks like Flink 1.7 RC is now 
available for use.

Some questions for you... in your use case, is your source Kafka and is the 
Flink app running high parallelism (>300)? Are you able to run with 
StreamingFileSink to S3 for multiple days without failure? When using 
StreamingFileSink, what type of configuration did you use? Thank you.

‐‐‐ Original Message ‐‐‐
On Monday, November 5, 2018 8:23 AM, Addison Higham  wrote:

> Hi there,
>
> This is going to be a bit of a long post, but I think there has been a lot of 
> confusion around S3, so I am going to go over everything I know in hopes that 
> helps.
>
> As mentioned by Rafi, The BucketingSink does not work for file systems like 
> S3, as the bucketing sink makes some assumptions that are incorrect for 
> eventually consistent file systems as well as for file systems that don't 
> have certain atomic operations, which leads to inconsistency (see 
> https://issues.apache.org/jira/browse/FLINK-6306). This has been poorly 
> documented in the docs, so I think a lot of people have tried to use s3 only 
> to face issues. There is a plan for moving forward however.
>
> However, that plan does NOT include "fixing" the BucketingSink. Instead, a 
> new API - the StreamingFileSink - is the replacement for BucketingSink, which 
> was first introduced in Flink 1.6 is planned to (eventually) fix the problem. 
> The first release of StreamingFileSink in the 1.6 branch didn't support S3. 
> This was originally seen as a bug that would be fixed in Flink 1.6.2, 
> however, once all the work was done to add support for S3, it seems it was 
> decided not to backport the fix (see this thread: 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/FLINK-9752-s3-recoverable-writer-not-actually-fixed-in-1-6-2-td24925.html).
>  This means that flink 1.6.2 does NOT fix the S3 issue, but the fix will be 
> included in 1.7, which is currently in feature freeze and will hopefully have 
> an RC in the next couple of weeks.
>
> But yes, if you need S3 support ASAP, you are in a bit of a pickle. My team 
> is in that situation, so this the options as we saw them:
>
> 0. Wait for flink 1.7
> 1. Run and build your own flink from master or flink-1.7 branches which has 
> support for S3 and StreamingFileSink
> 2. Write our own custom sink for s3 (probably with some caveats)
> 3. Backport the changes into flink 1.6
>
> We really didn't want to wait for 1.7, as that would make our delivery 
> timeline not great. We didn't love the idea of running a fun unreleased 
> version of flink in production either. As we looked into writing something 
> ourselves, it became clear
> pretty quick that we could fairly easily get an output sink to a file that 
> would be at-least-once delivery to a file, but exactly-once delivery would be 
> significantly more difficult. That is actually okay for our use case, but we 
> decided we would rather not have to
> revisit this later on and change all the code and then run a one-off job to 
> remove dupes. Instead, we decided to backport the changes into 1.6 branch. 
> Luckily, we already build our own flink, so we had that tooling already. The 
> backport took a few hours (it was fairly complicated to get all the changes), 
> but we seem to got everything
> working. The backport is here: 
> https://github.com/instructure/flink/tree/s3_recover_backport. Our plan is to 
> use that backport until 1.7 is stable, then we can upgrade without 
> (hopefully) having to change any code. We still recognize there is a 
> possibility for bugs in the backport, but
> for us that is mitigated by the fact that we are okay with at-least-once and 
> if all else fails, we have a period of transition where we have this data 
> being written in another location we can fall back to.
>
> So yeah, to reiterate, no out-of-the-box S3 stuff works ATM, but that should 
> hopefully be fixed *soon*. If you can wait, that is the easiest, if you 
> can't, building either your own custom sink or your own flink with the 
> backport isn't a terrible option.
>
> Hope that helps!
>
> Adddison
>
> On Sun, Nov 4, 2018 at 3:09 AM Flink Developer  
> wrote:
>
>> Hi Ravi, some questions:
>>
>> - Is this using Flink 1.6.2 with dependencies (flink-s3-fs-hadoop, 
>> flink-statebackend-rocksdb, hadoop-common, hadoop-aws, hadoop-hdfs, 
>> hadoop-common) ? If so, could you please share your dependency versioning?
>> - Does this use a kafka source with high flink parallelism (~400) for all 
>> kafka partitions and does it run continuously for several days?
&g

Recommendation for BucketingSink to GZIP compress files to S3

2018-11-04 Thread Flink Developer
Hi, what is the recommended method for using BucketingSink and compressing 
files using GZIP before it is uploaded to S3?

I read that one way is to extend the StreamWriterBase class and wrap the stream 
using GZIPOutputStream. Is there an Flink example for this? If so, what would 
be the proper way to override the methods (open(), write(), close(), flush())?

Thank you.

Re: Flink Kafka to BucketingSink to S3 - S3Exception

2018-11-04 Thread Flink Developer
Hi Ravi, some questions:

- Is this using Flink 1.6.2 with dependencies (flink-s3-fs-hadoop, 
flink-statebackend-rocksdb, hadoop-common, hadoop-aws, hadoop-hdfs, 
hadoop-common) ? If so, could you please share your dependency versioning?
- Does this use a kafka source with high flink parallelism (~400) for all kafka 
partitions and does it run continuously for several days?
- Could you please share your checkpoint interval configuration, batch file 
size, batch rollover interval configuration, and sink prefix (s3:// ,  s3a://)

Thank you
‐‐‐ Original Message ‐‐‐
On Saturday, November 3, 2018 7:18 AM, Ravi Bhushan Ratnakar 
 wrote:

> I have done little changes in BucketingSink and implemented as new 
> CustomBucketingSink to use in my project which works fine with s3 and s3a 
> protocol.  This implementation doesn't require xml file configuration, rather 
> than it uses configuration provided using flink configuration object by 
> calling setConfig method of BucketingSink.
>
> On Sat 3 Nov, 2018, 09:24 Flink Developer 
>> It seems the issue also appears when using Flink version 1.6.2 .
>> ‐‐‐ Original Message ‐‐‐
>> On Tuesday, October 30, 2018 10:26 PM, Flink Developer 
>>  wrote:
>>
>>> Hi, thanks for the info Rafi, that seems to be related.  I hope Flink 
>>> version 1.6.2 fixes this. Has anyone encountered this before?
>>>
>>> I would also like to note that my jar includes a core-site.xml file that 
>>> uses *s3a*. Is this the recommended configuration to use with 
>>> BucketingSink?   Should the sink be specified using s3a:/// 
>>> or  s3:/// ?
>>>
>>> - 
>>> - 
>>> - fs.s3.impl
>>> - org.apache.hadoop.fs.s3a.S3AFileSystem
>>> - 
>>> - 
>>> - fs.s3a.buffer.dir
>>> - /tmp
>>> - 
>>> - 
>>> - fs.s3a.access.key
>>> - x
>>> - 
>>> - 
>>> - fs.s3a.secret.key
>>> - x
>>> - 
>>> - 
>>> - fs.s3a.buffer.dir
>>> - /tmp
>>> - 
>>> - 
>>>
>>> And my pom.xml uses:
>>>
>>> - flink-s3-fs-hadoop
>>> - ...
>>> - flink-statebackend-rocksdb_2.11
>>> - ...
>>> - hadoop-hdfs
>>> - ...
>>> - hadoop-common
>>> - ...
>>> - hadoop-core
>>> - ...
>>> - hadoop-aws
>>> - ...
>>>
>>> ‐‐‐ Original Message ‐‐‐
>>> On Sunday, October 28, 2018 8:08 AM, Rafi Aroch  
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm also experiencing this with Flink 1.5.2. This is probably related to 
>>>> BucketingSink not working properly with S3 as filesystem because of the 
>>>> eventual-consistency of S3.
>>>>
>>>> I see that https://issues.apache.org/jira/browse/FLINK-9752 will be part 
>>>> of 1.6.2 release. It might help, if you use the flink-s3-fs-hadoop (and 
>>>> not presto).
>>>>
>>>> Does anyone know if this fix would solve this issue?
>>>>
>>>> Thanks,
>>>> Rafi
>>>>
>>>> On Sun, Oct 28, 2018 at 12:08 AM Flink Developer 
>>>>  wrote:
>>>>
>>>>> Hi, I'm running a scala flink app in an AWS EMR cluster (emr 5.17, hadoop 
>>>>> 2.8.4)  with flink parallelization set to 400. The source is a Kafka 
>>>>> topic and sinks to S3 in the format of: 
>>>>> s3:/. There's potentially 400 files 
>>>>> writing simultaneously.
>>>>>
>>>>> Configuration:
>>>>> - Flink v1.5.2
>>>>> - Checkpointing enabled w/ RocksDb (flink-statebackend-rocksdb_2.11, 
>>>>> v1.6.1). Interval is every 2 mins with max concurrent set to 1. Min pause 
>>>>> between checkpoints in 2 mins. Timeout is set to 2 mins.
>>>>> - BucketingSink (flink-connector-filesystem_2.11, v1.6.1).
>>>>> - Batch file size is set to 5mb.
>>>>> - Batch rollover interval is set to 30min
>>>>> - Writer uses GZip compression
>>>>> - Hadoop Maven Dependencies (hadoop-hdfs v3.1.1, hadoop-common v3.1.1, 
>>>>> hadoop-core v1.2.1, hadoop-aws v3.1.1)
>>>>>
>>>>> The app is able to run for hours straight, but occasionally (once or 
>>>>> twice a day), it displays the following exception. When this happens, the 
>>>&

Re: Flink Kafka to BucketingSink to S3 - S3Exception

2018-11-03 Thread Flink Developer
It seems the issue also appears when using Flink version 1.6.2 .
‐‐‐ Original Message ‐‐‐
On Tuesday, October 30, 2018 10:26 PM, Flink Developer 
 wrote:

> Hi, thanks for the info Rafi, that seems to be related.  I hope Flink version 
> 1.6.2 fixes this. Has anyone encountered this before?
>
> I would also like to note that my jar includes a core-site.xml file that uses 
> *s3a*. Is this the recommended configuration to use with BucketingSink?   
> Should the sink be specified using s3a:/// or  
> s3:/// ?
>
> - 
> - 
> - fs.s3.impl
> - org.apache.hadoop.fs.s3a.S3AFileSystem
> - 
> - 
> - fs.s3a.buffer.dir
> - /tmp
> - 
> - 
> - fs.s3a.access.key
> - x
> - 
> - 
> - fs.s3a.secret.key
> - x
> - 
> - 
> - fs.s3a.buffer.dir
> - /tmp
> - 
> - 
>
> And my pom.xml uses:
>
> - flink-s3-fs-hadoop
> - ...
> - flink-statebackend-rocksdb_2.11
> - ...
> - hadoop-hdfs
> - ...
> - hadoop-common
> - ...
> - hadoop-core
> - ...
> - hadoop-aws
> - ...
>
> ‐‐‐ Original Message ‐‐‐
> On Sunday, October 28, 2018 8:08 AM, Rafi Aroch  wrote:
>
>> Hi,
>>
>> I'm also experiencing this with Flink 1.5.2. This is probably related to 
>> BucketingSink not working properly with S3 as filesystem because of the 
>> eventual-consistency of S3.
>>
>> I see that https://issues.apache.org/jira/browse/FLINK-9752 will be part of 
>> 1.6.2 release. It might help, if you use the flink-s3-fs-hadoop (and not 
>> presto).
>>
>> Does anyone know if this fix would solve this issue?
>>
>> Thanks,
>> Rafi
>>
>> On Sun, Oct 28, 2018 at 12:08 AM Flink Developer 
>>  wrote:
>>
>>> Hi, I'm running a scala flink app in an AWS EMR cluster (emr 5.17, hadoop 
>>> 2.8.4)  with flink parallelization set to 400. The source is a Kafka topic 
>>> and sinks to S3 in the format of: 
>>> s3:/. There's potentially 400 files 
>>> writing simultaneously.
>>>
>>> Configuration:
>>> - Flink v1.5.2
>>> - Checkpointing enabled w/ RocksDb (flink-statebackend-rocksdb_2.11, 
>>> v1.6.1). Interval is every 2 mins with max concurrent set to 1. Min pause 
>>> between checkpoints in 2 mins. Timeout is set to 2 mins.
>>> - BucketingSink (flink-connector-filesystem_2.11, v1.6.1).
>>> - Batch file size is set to 5mb.
>>> - Batch rollover interval is set to 30min
>>> - Writer uses GZip compression
>>> - Hadoop Maven Dependencies (hadoop-hdfs v3.1.1, hadoop-common v3.1.1, 
>>> hadoop-core v1.2.1, hadoop-aws v3.1.1)
>>>
>>> The app is able to run for hours straight, but occasionally (once or twice 
>>> a day), it displays the following exception. When this happens, the app is 
>>> able to recover from previous checkpoint, but I am concerned about the 
>>> exception:
>>>
>>> Caused by: java.io.IOException: 
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>>>  Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID: 
>>> , S3 Extended Request ID: xx
>>>
>>> - at 
>>> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.handleAmazonServiceException(Jets3tNativeFileSystemStore.java:446)
>>>
>>> - at 
>>> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:427)
>>>
>>> - at 
>>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1297)
>>>
>>> - at 
>>> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:312)
>>>
>>> - at 
>>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:815)
>>>
>>> Caused by: 
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>>>  Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID: 
>>> , S3 Extended Request ID: xx
>>>
>>> - at 
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1632)
>>>
>>> - at 
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClien

Re: Flink Kafka to BucketingSink to S3 - S3Exception

2018-10-30 Thread Flink Developer
Hi, thanks for the info Rafi, that seems to be related.  I hope Flink version 
1.6.2 fixes this. Has anyone encountered this before?

I would also like to note that my jar includes a core-site.xml file that uses 
*s3a*. Is this the recommended configuration to use with BucketingSink?   
Should the sink be specified using s3a:/// or  
s3:/// ?

- 
- 
- fs.s3.impl
- org.apache.hadoop.fs.s3a.S3AFileSystem
- 
- 
- fs.s3a.buffer.dir
- /tmp
- 
- 
- fs.s3a.access.key
- x
- 
- 
- fs.s3a.secret.key
- x
- 
- 
- fs.s3a.buffer.dir
- /tmp
- 
- 

And my pom.xml uses:

- flink-s3-fs-hadoop
- ...
- flink-statebackend-rocksdb_2.11
- ...
- hadoop-hdfs
- ...
- hadoop-common
- ...
- hadoop-core
- ...
- hadoop-aws
- ...

‐‐‐ Original Message ‐‐‐
On Sunday, October 28, 2018 8:08 AM, Rafi Aroch  wrote:

> Hi,
>
> I'm also experiencing this with Flink 1.5.2. This is probably related to 
> BucketingSink not working properly with S3 as filesystem because of the 
> eventual-consistency of S3.
>
> I see that https://issues.apache.org/jira/browse/FLINK-9752 will be part of 
> 1.6.2 release. It might help, if you use the flink-s3-fs-hadoop (and not 
> presto).
>
> Does anyone know if this fix would solve this issue?
>
> Thanks,
> Rafi
>
> On Sun, Oct 28, 2018 at 12:08 AM Flink Developer 
>  wrote:
>
>> Hi, I'm running a scala flink app in an AWS EMR cluster (emr 5.17, hadoop 
>> 2.8.4)  with flink parallelization set to 400. The source is a Kafka topic 
>> and sinks to S3 in the format of: s3:/. 
>> There's potentially 400 files writing simultaneously.
>>
>> Configuration:
>> - Flink v1.5.2
>> - Checkpointing enabled w/ RocksDb (flink-statebackend-rocksdb_2.11, 
>> v1.6.1). Interval is every 2 mins with max concurrent set to 1. Min pause 
>> between checkpoints in 2 mins. Timeout is set to 2 mins.
>> - BucketingSink (flink-connector-filesystem_2.11, v1.6.1).
>> - Batch file size is set to 5mb.
>> - Batch rollover interval is set to 30min
>> - Writer uses GZip compression
>> - Hadoop Maven Dependencies (hadoop-hdfs v3.1.1, hadoop-common v3.1.1, 
>> hadoop-core v1.2.1, hadoop-aws v3.1.1)
>>
>> The app is able to run for hours straight, but occasionally (once or twice a 
>> day), it displays the following exception. When this happens, the app is 
>> able to recover from previous checkpoint, but I am concerned about the 
>> exception:
>>
>> Caused by: java.io.IOException: 
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>>  Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID: 
>> , S3 Extended Request ID: xx
>>
>> - at 
>> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.handleAmazonServiceException(Jets3tNativeFileSystemStore.java:446)
>>
>> - at 
>> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:427)
>>
>> - at 
>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1297)
>>
>> - at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:312)
>>
>> - at 
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:815)
>>
>> Caused by: 
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>>  Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID: 
>> , S3 Extended Request ID: xx
>>
>> - at 
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1632)
>>
>> - at 
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
>>
>> - at 
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058)
>>
>> - at 
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>>
>> - at 
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>>
>> - at 
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>>
>> - at 
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpCli

Flink Kafka to BucketingSink to S3 - S3Exception

2018-10-27 Thread Flink Developer
Hi, I'm running a scala flink app in an AWS EMR cluster (emr 5.17, hadoop 
2.8.4)  with flink parallelization set to 400. The source is a Kafka topic and 
sinks to S3 in the format of: s3:/. 
There's potentially 400 files writing simultaneously.

Configuration:
- Flink v1.5.2
- Checkpointing enabled w/ RocksDb (flink-statebackend-rocksdb_2.11, v1.6.1). 
Interval is every 2 mins with max concurrent set to 1. Min pause between 
checkpoints in 2 mins. Timeout is set to 2 mins.
- BucketingSink (flink-connector-filesystem_2.11, v1.6.1).
- Batch file size is set to 5mb.
- Batch rollover interval is set to 30min
- Writer uses GZip compression
- Hadoop Maven Dependencies (hadoop-hdfs v3.1.1, hadoop-common v3.1.1, 
hadoop-core v1.2.1, hadoop-aws v3.1.1)

The app is able to run for hours straight, but occasionally (once or twice a 
day), it displays the following exception. When this happens, the app is able 
to recover from previous checkpoint, but I am concerned about the exception:

Caused by: java.io.IOException: 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
 Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID: 
, S3 Extended Request ID: xx

- at 
com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.handleAmazonServiceException(Jets3tNativeFileSystemStore.java:446)

- at 
com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:427)

- at 
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1297)

- at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:312)

- at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:815)

Caused by: 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
 Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID: 
, S3 Extended Request ID: xx

- at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1632)

- at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)

- at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058)

- at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)

- at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)

- at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)

- at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)

- at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)

- at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)

- at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4365)

- at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4312)

- at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1270)

- at 
com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)

- at 
com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:8)

- at 
com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Exxecutor.execute(GlobalS3Executor.java:91)

And sometimes, it will show this:

- java.lang.RuntimeException: Error while restoring BucketingSink state.

- at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:888)

- at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handleRestoredBucketState(BucketingSink.java:767)

- at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:394)

- at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)

- at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)

- at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)

- at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)

- at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)

- at 

Reading multiple files from S3 source in parallel

2018-10-23 Thread Flink Developer
Hello,
I'm interested in creating a Flink batch app that can process multiple files 
from S3 source in parallel. Let's say I have the following S3 structure and 
that my Flink App has Parallelism set to 3 workers.
 s3://bucket/data-1/worker-1/file-1.txt
 s3://bucket/data-1/worker-1/file-2.txt
 s3://bucket/data-1/worker-2/file-1.txt
 s3://bucket/data-1/worker-2/file-2.txt
 s3://bucket/data-1/worker-3/file-1.txt
 s3://bucket/data-1/worker-3/file-2.txt

 s3://bucket/data-2/worker-1/file-1.txt
 s3://bucket/data-2/worker-1/file-2.txt
 s3://bucket/data-2/worker-2/file-1.txt
 s3://bucket/data-2/worker-2/file-2.txt
 s3://bucket/data-2/worker-3/file-1.txt
 s3://bucket/data-2/worker-3/file-2.txt

 s3://bucket/data-3/worker-1/file-1.txt
 s3://bucket/data-3/worker-1/file-2.txt
 s3://bucket/data-3/worker-2/file-1.txt
 s3://bucket/data-3/worker-2/file-2.txt
 s3://bucket/data-3/worker-3/file-1.txt
 s3://bucket/data-3/worker-3/file-2.txt

I'm interested in having the flink workers process in parallel. For example, 
flink worker #1 should process only these files and in this order:
 s3://bucket/data-1/worker-1/file-1.txt
 s3://bucket/data-1/worker-1/file-2.txt
 s3://bucket/data-2/worker-1/file-1.txt
 s3://bucket/data-2/worker-1/file-2.txt
 s3://bucket/data-3/worker-1/file-1.txt
 s3://bucket/data-3/worker-1/file-2.txt

How can I configure the data source to the Flink App to handle this? Thank you 
for your help.