[jira] [Comment Edited] (BAHIR-213) Faster S3 file Source for Structured Streaming with SQS

2019-07-31 Thread Abhishek Dixit (JIRA)


[ 
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896994#comment-16896994
 ] 

Abhishek Dixit edited comment on BAHIR-213 at 7/31/19 9:56 AM:
---

* *Issue of  FNFE when querying file :* I checked the behavior of the existing 
implementation of FileStreamSource for this scenario, i.e. if the file gets 
listed in the _getOffset_ phase but gets deleted before file read happens in 
_getBatch,_ Spark catches the FNFE and the following warning is thrown 
{code:java}
The directory $path was not found. Was it deleted very recently?{code}
I have gone with similar behavior in SqsSource. Let me know if you have any 
concerns about mimicking FileStreamSource Behavior for this scenario.

 * *Issue of File State Update:* In case of fo FileStreamSource, files are 
listed in _getOffset_ phase and if the contents of the same file change before 
the file is actually read in the _getBatch_ phase, then the initial content of 
the file is lost & not processed. Only the latest content of file available 
during _getBatch_ is processed & similar will be the behavior of SQS Source.

 * *Issue of Double Update:* On the other hand, if the file is processed in 
_getBatch_ and then later updated, FileStreamSource considers the file as 
seen/processed and doesn't read it again unless the file is aged. SQS Source 
also behaves, in the same way, i.e. if an SQS message pertaining to an already 
processed file comes again, it is simply ignored.

 * *Issue of Messages arriving out of order:* By default, the new/unprocessed 
file list obtained from SQS File Cache is sorted based on timestamp (of file 
updation) to avoid the issue of messages arriving out of order. In case some 
messages arrive very late, they will be processed in succeeding micro-batches. 
In any case, we can't guarantee to process files in the order of timestamp 
because of the distributed nature of SQS. 

 

[~ste...@apache.org] Let me know if you have any concerns with any of the above 
points and want me to change the implementation in some way.


was (Author: abhishekd0907):
* *Issue of  FNFE when querying file:* I checked the behavior of the existing 
implementation of FileStreamSource for this scenario, i.e. if the file gets 
listed in the _getOffset_ phase but gets deleted before file read happens in 
_getBatch,_ Spark catches the FNFE and the following warning is thrown 
{code:java}
The directory $path was not found. Was it deleted very recently?{code}
I have gone with similar behavior in SqsSource. Let me know if you have any 
concerns about mimicking FileStreamSource Behavior for this scenario.

 * *Issue of File State Update:* In case of fo FileStreamSource, files are 
listed in _getOffset_ phase and if the contents of the same file change before 
the file is actually read in the _getBatch_ phase, then the initial content of 
the file is lost & not processed. Only the latest content of file available 
during _getBatch_ is processed & similar will be the behavior of SQS Source.

 * *Issue of Double Update:* On the other hand, if the file is processed in 
_getBatch_ and then later updated, FileStreamSource considers the file as 
seen/processed and doesn't read it again unless the file is aged. SQS Source 
also behaves, in the same way, i.e. if an SQS message pertaining to an already 
processed file comes again, it is simply ignored.

 * *Issue of Messages arriving out of order:* By default, the new/unprocessed 
file list obtained from SQS File Cache is sorted based on timestamp (of file 
updation) to avoid the issue of messages arriving out of order. In case some 
messages arrive very late, they will be processed in succeeding micro-batches. 
In any case, we can't guarantee to process files in the order of timestamp 
because of the distributed nature of SQS. 

 

[~ste...@apache.org] Let me know if you have any concerns with any of the above 
points and want me to change the implementation in some way.

> Faster S3 file Source for Structured Streaming with SQS
> ---
>
> Key: BAHIR-213
> URL: https://issues.apache.org/jira/browse/BAHIR-213
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.4.0
>Reporter: Abhishek Dixit
>Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in 
> terms of costs and latency:
>  * *Latency:* Listing all the files in S3 buckets every microbatch can be 
> both slow and resource intensive.
>  * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
> new files written to S3 bucket without the need to list all the files every 
> microbatch.
> S3 buckets can be 

[jira] [Updated] (BAHIR-213) Faster S3 file Source for Structured Streaming with SQS

2019-07-31 Thread Abhishek Dixit (JIRA)


 [ 
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhishek Dixit updated BAHIR-213:
-
Description: 
Using FileStreamSource to read files from a S3 bucket has problems both in 
terms of costs and latency:
 * *Latency:* Listing all the files in S3 buckets every microbatch can be both 
slow and resource intensive.
 * *Costs:* Making List API requests to S3 every microbatch can be costly.

The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
new files written to S3 bucket without the need to list all the files every 
microbatch.

S3 buckets can be configured to send notification to an Amazon SQS Queue on 
Object Create / Object Delete events. For details see AWS documentation here 
[Configuring S3 Event 
Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
 

Spark can leverage this to find new files written to S3 bucket by reading 
notifications from SQS queue instead of listing files every microbatch.

I hope to contribute changes proposed in [this pull 
request|https://github.com/apache/spark/pull/24934] to Apache Bahir as 
suggested by [gaborgsomogyi|https://github.com/gaborgsomogyi]  
[here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]

  was:
Using FileStreamSource to read files from a S3 bucket has problems both in 
terms of costs and latency:
 * *Latency:* Listing all the files in S3 buckets every microbatch can be both 
slow and resource intensive.
 * *Costs:* Making List API requests to S3 every microbatch can be costly.

The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
new files written to S3 bucket without the need to list all the files every 
microbatch.

S3 buckets can be configured to send notification to an Amazon SQS Queue on 
Object Create / Object Delete events. For details see AWS documentation here 
[Configuring S3 Event 
Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
 

Spark can leverage this to find new files written to S3 bucket by reading 
notifications from SQS queue instead of listing files every microbatch.

I hope to contribute changes proposed in [this pull 
request|https://github.com/apache/spark/pull/24934] to Apache Bahir as 
suggested by @[gaborgsomogyi|https://github.com/gaborgsomogyi]  
[here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]


> Faster S3 file Source for Structured Streaming with SQS
> ---
>
> Key: BAHIR-213
> URL: https://issues.apache.org/jira/browse/BAHIR-213
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.4.0
>Reporter: Abhishek Dixit
>Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in 
> terms of costs and latency:
>  * *Latency:* Listing all the files in S3 buckets every microbatch can be 
> both slow and resource intensive.
>  * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
> new files written to S3 bucket without the need to list all the files every 
> microbatch.
> S3 buckets can be configured to send notification to an Amazon SQS Queue on 
> Object Create / Object Delete events. For details see AWS documentation here 
> [Configuring S3 Event 
> Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
>  
> Spark can leverage this to find new files written to S3 bucket by reading 
> notifications from SQS queue instead of listing files every microbatch.
> I hope to contribute changes proposed in [this pull 
> request|https://github.com/apache/spark/pull/24934] to Apache Bahir as 
> suggested by [gaborgsomogyi|https://github.com/gaborgsomogyi]  
> [here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BAHIR-213) Faster S3 file Source for Structured Streaming with SQS

2019-07-31 Thread Abhishek Dixit (JIRA)


[ 
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896994#comment-16896994
 ] 

Abhishek Dixit edited comment on BAHIR-213 at 7/31/19 9:51 AM:
---

* *Issue of  FNFE when querying file:* I checked the behavior of the existing 
implementation of FileStreamSource for this scenario, i.e. if the file gets 
listed in the _getOffset_ phase but gets deleted before file read happens in 
_getBatch,_ Spark catches the FNFE and the following warning is thrown 
{code:java}
The directory $path was not found. Was it deleted very recently?{code}
I have gone with similar behavior in SqsSource. Let me know if you have any 
concerns about mimicking FileStreamSource Behavior for this scenario.

 * *Issue of File State Update:* In case of fo FileStreamSource, files are 
listed in _getOffset_ phase and if the contents of the same file change before 
the file is actually read in the _getBatch_ phase, then the initial content of 
the file is lost & not processed. Only the latest content of file available 
during _getBatch_ is processed & similar will be the behavior of SQS Source.

 * *Issue of Double Update:* On the other hand, if the file is processed in 
_getBatch_ and then later updated, FileStreamSource considers the file as 
seen/processed and doesn't read it again unless the file is aged. SQS Source 
also behaves, in the same way, i.e. if an SQS message pertaining to an already 
processed file comes again, it is simply ignored.

 * *Issue of Messages arriving out of order:* By default, the new/unprocessed 
file list obtained from SQS File Cache is sorted based on timestamp (of file 
updation) to avoid the issue of messages arriving out of order. In case some 
messages arrive very late, they will be processed in succeeding micro-batches. 
In any case, we can't guarantee to process files in the order of timestamp 
because of the distributed nature of SQS. 

 

[~ste...@apache.org] Let me know if you have any concerns with any of the above 
points and want me to change the implementation in some way.


was (Author: abhishekd0907):
* *Issue of  FNFE when querying [file:*|file:///*] I checked the behavior of 
the existing implementation of FileStreamSource for this scenario, i.e. if the 
file gets listed in the _getOffset_ phase but gets deleted before file read 
happens in _getBatch,_ Spark catches the FNFE and the following warning is 
thrown 
{code:java}
The directory $path was not found. Was it deleted very recently?{code}
I have gone with similar behavior in SqsSource. Let me know if you have any 
concerns about mimicking FileStreamSource Behavior for this scenario.

 
 * *Issue of File State Update:* In case of fo FileStreamSource, files are 
listed in _getOffset_ phase and if the contents of the same file change before 
the file is actually read in the _getBatch_ phase, then the initial content of 
the file is lost & not processed. Only the latest content of file available 
during _getBatch_ is processed & similar will be the behavior of SQS Source.

 * *Issue of Double Update:* On the other hand, if the file is processed in 
_getBatch_ and then later updated, FileStreamSource considers the file as 
seen/processed and doesn't read it again unless the file is aged. SQS Source 
also behaves, in the same way, i.e. if an SQS message pertaining to an already 
processed file comes again, it is simply ignored.

 
 * *Issue of Messages arriving out of order:* By default, the new/unprocessed 
file list obtained from SQS File Cache is sorted based on timestamp (of file 
updation) to avoid the issue of messages arriving out of order. In case some 
messages arrive very late, they will be processed in succeeding micro-batches. 
In any case, we can't guarantee to process files in the order of timestamp 
because of the distributed nature of SQS. 

[~ste...@apache.org] Let me know if you have any concerns with any of the above 
points and want me to change the implementation in some way.

> Faster S3 file Source for Structured Streaming with SQS
> ---
>
> Key: BAHIR-213
> URL: https://issues.apache.org/jira/browse/BAHIR-213
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.4.0
>Reporter: Abhishek Dixit
>Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in 
> terms of costs and latency:
>  * *Latency:* Listing all the files in S3 buckets every microbatch can be 
> both slow and resource intensive.
>  * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
> new files written to S3 bucket without the need to list all the files every 
> microbatch.
> S3 

[jira] [Comment Edited] (BAHIR-213) Faster S3 file Source for Structured Streaming with SQS

2019-07-31 Thread Abhishek Dixit (JIRA)


[ 
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896994#comment-16896994
 ] 

Abhishek Dixit edited comment on BAHIR-213 at 7/31/19 9:50 AM:
---

* *Issue of  FNFE when querying [file:*|file:///*] I checked the behavior of 
the existing implementation of FileStreamSource for this scenario, i.e. if the 
file gets listed in the _getOffset_ phase but gets deleted before file read 
happens in _getBatch,_ Spark catches the FNFE and the following warning is 
thrown 
{code:java}
The directory $path was not found. Was it deleted very recently?{code}
I have gone with similar behavior in SqsSource. Let me know if you have any 
concerns about mimicking FileStreamSource Behavior for this scenario.

 
 * *Issue of File State Update:* In case of fo FileStreamSource, files are 
listed in _getOffset_ phase and if the contents of the same file change before 
the file is actually read in the _getBatch_ phase, then the initial content of 
the file is lost & not processed. Only the latest content of file available 
during _getBatch_ is processed & similar will be the behavior of SQS Source.

 * *Issue of Double Update:* On the other hand, if the file is processed in 
_getBatch_ and then later updated, FileStreamSource considers the file as 
seen/processed and doesn't read it again unless the file is aged. SQS Source 
also behaves, in the same way, i.e. if an SQS message pertaining to an already 
processed file comes again, it is simply ignored.

 
 * *Issue of Messages arriving out of order:* By default, the new/unprocessed 
file list obtained from SQS File Cache is sorted based on timestamp (of file 
updation) to avoid the issue of messages arriving out of order. In case some 
messages arrive very late, they will be processed in succeeding micro-batches. 
In any case, we can't guarantee to process files in the order of timestamp 
because of the distributed nature of SQS. 

[~ste...@apache.org] Let me know if you have any concerns with any of the above 
points and want me to change the implementation in some way.


was (Author: abhishekd0907):
* *Issue of  FNFE when querying [file:*|file:///*] I checked the behavior of 
the existing implementation of FileStreamSource for this scenario, i.e. if the 
file gets listed in the _getOffset_ phase but gets deleted before file read 
happens in _getBatch,_ Spark catches the FNFE and the following warning is 
thrown 
{code:java}
The directory $path was not found. Was it deleted very recently?{code}
I have gone with similar behavior in SqsSource. Let me know if you have any 
concerns about mimicking FileStreamSource Behavior for this scenario.

 
 * *Issue of File State Update:* In case of fo FileStreamSource, files are 
listed in _getOffset_ phase and if the contents of the same file change before 
the file is actually read in the _getBatch_ phase, then the initial content of 
the file is lost & not processed. Only the latest content of file available 
during _getBatch_ is processed & similar will be the behavior of SQS Source.

 * *Issue of Double Update:* On the other hand, if the file is processed in 
_getBatch_ and then later updated, FileStreamSource considers the file as 
seen/processed and doesn't read it again unless the file is aged. SQS Source 
also behaves, in the same way, i.e. if an SQS message pertaining to an already 
processed file comes again, it is simply ignored.

 

*Issue of Messages arriving out of order:* By default, the new/unprocessed file 
list obtained from SQS File Cache is sorted based on timestamp (of file 
updation) to avoid the issue of messages arriving out of order. In case some 
messages arrive very late, they will be processed in succeeding micro-batches. 
In any case, we can't guarantee to process files in the order of timestamp 
because of the distributed nature of SQS. 

[~ste...@apache.org] Let me know if you have any concerns with any of the above 
points and want me to change the implementation in some way.

> Faster S3 file Source for Structured Streaming with SQS
> ---
>
> Key: BAHIR-213
> URL: https://issues.apache.org/jira/browse/BAHIR-213
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.4.0
>Reporter: Abhishek Dixit
>Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in 
> terms of costs and latency:
>  * *Latency:* Listing all the files in S3 buckets every microbatch can be 
> both slow and resource intensive.
>  * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
> new files written to S3 bucket without the need to list all the files every 
> 

[jira] [Comment Edited] (BAHIR-213) Faster S3 file Source for Structured Streaming with SQS

2019-07-31 Thread Abhishek Dixit (JIRA)


[ 
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896997#comment-16896997
 ] 

Abhishek Dixit edited comment on BAHIR-213 at 7/31/19 9:49 AM:
---

I see that Pull Request is not linked in the issue. Please find the pull 
request here:

[Faster S3 file Source for Structured Streaming with 
SQS|https://github.com/apache/bahir/pull/91]

cc [~ste...@apache.org]


was (Author: abhishekd0907):
I see that Pull Request is not linked in the issue. Please find the pull 
request here:

[Faster S3 file Source for Structured Streaming with 
SQS|https://github.com/apache/bahir/pull/91]

 

cc [~ste...@apache.org]

> Faster S3 file Source for Structured Streaming with SQS
> ---
>
> Key: BAHIR-213
> URL: https://issues.apache.org/jira/browse/BAHIR-213
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.4.0
>Reporter: Abhishek Dixit
>Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in 
> terms of costs and latency:
>  * *Latency:* Listing all the files in S3 buckets every microbatch can be 
> both slow and resource intensive.
>  * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
> new files written to S3 bucket without the need to list all the files every 
> microbatch.
> S3 buckets can be configured to send notification to an Amazon SQS Queue on 
> Object Create / Object Delete events. For details see AWS documentation here 
> [Configuring S3 Event 
> Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
>  
> Spark can leverage this to find new files written to S3 bucket by reading 
> notifications from SQS queue instead of listing files every microbatch.
> I hope to contribute changes proposed in [this pull 
> request|https://github.com/apache/spark/pull/24934] to Apache Bahir as 
> suggested by @[gaborgsomogyi|https://github.com/gaborgsomogyi]  
> [here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BAHIR-213) Faster S3 file Source for Structured Streaming with SQS

2019-07-31 Thread Abhishek Dixit (JIRA)


[ 
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896997#comment-16896997
 ] 

Abhishek Dixit edited comment on BAHIR-213 at 7/31/19 9:49 AM:
---

I see that Pull Request is not linked in the issue. Please find the pull 
request here:

[Faster S3 file Source for Structured Streaming with 
SQS|https://github.com/apache/bahir/pull/91]

 

cc [~ste...@apache.org]


was (Author: abhishekd0907):
I see that Pull Request is not linked in the issue. Please find the pull 
request here:

[Faster S3 file Source for Structured Streaming with 
SQS|https://github.com/apache/bahir/pull/91]

> Faster S3 file Source for Structured Streaming with SQS
> ---
>
> Key: BAHIR-213
> URL: https://issues.apache.org/jira/browse/BAHIR-213
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.4.0
>Reporter: Abhishek Dixit
>Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in 
> terms of costs and latency:
>  * *Latency:* Listing all the files in S3 buckets every microbatch can be 
> both slow and resource intensive.
>  * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
> new files written to S3 bucket without the need to list all the files every 
> microbatch.
> S3 buckets can be configured to send notification to an Amazon SQS Queue on 
> Object Create / Object Delete events. For details see AWS documentation here 
> [Configuring S3 Event 
> Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
>  
> Spark can leverage this to find new files written to S3 bucket by reading 
> notifications from SQS queue instead of listing files every microbatch.
> I hope to contribute changes proposed in [this pull 
> request|https://github.com/apache/spark/pull/24934] to Apache Bahir as 
> suggested by @[gaborgsomogyi|https://github.com/gaborgsomogyi]  
> [here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BAHIR-213) Faster S3 file Source for Structured Streaming with SQS

2019-07-31 Thread Abhishek Dixit (JIRA)


[ 
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896994#comment-16896994
 ] 

Abhishek Dixit edited comment on BAHIR-213 at 7/31/19 9:49 AM:
---

* *Issue of  FNFE when querying [file:*|file:///*] I checked the behavior of 
the existing implementation of FileStreamSource for this scenario, i.e. if the 
file gets listed in the _getOffset_ phase but gets deleted before file read 
happens in _getBatch,_ Spark catches the FNFE and the following warning is 
thrown 
{code:java}
The directory $path was not found. Was it deleted very recently?{code}
I have gone with similar behavior in SqsSource. Let me know if you have any 
concerns about mimicking FileStreamSource Behavior for this scenario.

 
 * *Issue of File State Update:* In case of fo FileStreamSource, files are 
listed in _getOffset_ phase and if the contents of the same file change before 
the file is actually read in the _getBatch_ phase, then the initial content of 
the file is lost & not processed. Only the latest content of file available 
during _getBatch_ is processed & similar will be the behavior of SQS Source.

 * *Issue of Double Update:* On the other hand, if the file is processed in 
_getBatch_ and then later updated, FileStreamSource considers the file as 
seen/processed and doesn't read it again unless the file is aged. SQS Source 
also behaves, in the same way, i.e. if an SQS message pertaining to an already 
processed file comes again, it is simply ignored.

 

*Issue of Messages arriving out of order:* By default, the new/unprocessed file 
list obtained from SQS File Cache is sorted based on timestamp (of file 
updation) to avoid the issue of messages arriving out of order. In case some 
messages arrive very late, they will be processed in succeeding micro-batches. 
In any case, we can't guarantee to process files in the order of timestamp 
because of the distributed nature of SQS. 

[~ste...@apache.org] Let me know if you have any concerns with any of the above 
points and want me to change the implementation in some way.


was (Author: abhishekd0907):
* *Issue of  FNFE when querying file:* I checked the behavior of the existing 
implementation of FileStreamSource for this scenario, i.e. if the file gets 
listed in the _getOffset_ phase but gets deleted before file read happens in 
_getBatch,_ Spark catches the FNFE and the following warning is thrown 
{code:java}
The directory $path was not found. Was it deleted very recently?{code}
I have gone with similar behavior in SqsSource. Let me know if you have any 
concerns about mimicking FileStreamSource Behavior for this scenario.

 

 * *Issue of File State Update:* In case of fo FileStreamSource, files are 
listed in _getOffset_ phase and if the contents of the same file change before 
the file is actually read in the _getBatch_ phase, then the initial content of 
the file is lost & not processed. Only the latest content of file available 
during _getBatch_ is processed & similar will be the behavior of SQS Source.

 * *Issue of Double Update:* On the other hand, if the file is processed in 
_getBatch_ and then later updated, FileStreamSource considers the file as 
seen/processed and doesn't read it again unless the file is aged. SQS Source 
also behaves, in the same way, i.e. if an SQS message pertaining to an already 
processed file comes again, it is simply ignored.

 *  

*Issue of Messages arriving out of order:* By default, the new/unprocessed file 
list obtained from SQS File Cache is sorted based on timestamp (of file 
updation) to avoid the issue of messages arriving out of order. In case some 
messages arrive very late, they will be processed in succeeding micro-batches. 
In any case, we can't guarantee to process files in the order of timestamp 
because of the distributed nature of SQS.

 

[~ste...@apache.org] Let me know if you have any concerns with any of the above 
points and want me to change the implementation in some way.

> Faster S3 file Source for Structured Streaming with SQS
> ---
>
> Key: BAHIR-213
> URL: https://issues.apache.org/jira/browse/BAHIR-213
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.4.0
>Reporter: Abhishek Dixit
>Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in 
> terms of costs and latency:
>  * *Latency:* Listing all the files in S3 buckets every microbatch can be 
> both slow and resource intensive.
>  * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
> new files written to S3 bucket without the need to list all the files every 
> microbatch.
> 

[jira] [Comment Edited] (BAHIR-213) Faster S3 file Source for Structured Streaming with SQS

2019-07-31 Thread Abhishek Dixit (JIRA)


[ 
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896997#comment-16896997
 ] 

Abhishek Dixit edited comment on BAHIR-213 at 7/31/19 9:48 AM:
---

I see that Pull Request is not linked in the issue. Please find the pull 
request here:

[Faster S3 file Source for Structured Streaming with 
SQS|https://github.com/apache/bahir/pull/91]


was (Author: abhishekd0907):
I see that Pull Request is not linked in the issue. Please find the pull 
request here:


[[BAHIR-213] Faster S3 file Source for Structured Streaming with 
SQS|[https://github.com/apache/bahir/pull/91]]

> Faster S3 file Source for Structured Streaming with SQS
> ---
>
> Key: BAHIR-213
> URL: https://issues.apache.org/jira/browse/BAHIR-213
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.4.0
>Reporter: Abhishek Dixit
>Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in 
> terms of costs and latency:
>  * *Latency:* Listing all the files in S3 buckets every microbatch can be 
> both slow and resource intensive.
>  * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
> new files written to S3 bucket without the need to list all the files every 
> microbatch.
> S3 buckets can be configured to send notification to an Amazon SQS Queue on 
> Object Create / Object Delete events. For details see AWS documentation here 
> [Configuring S3 Event 
> Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
>  
> Spark can leverage this to find new files written to S3 bucket by reading 
> notifications from SQS queue instead of listing files every microbatch.
> I hope to contribute changes proposed in [this pull 
> request|https://github.com/apache/spark/pull/24934] to Apache Bahir as 
> suggested by @[gaborgsomogyi|https://github.com/gaborgsomogyi]  
> [here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (BAHIR-213) Faster S3 file Source for Structured Streaming with SQS

2019-07-31 Thread Abhishek Dixit (JIRA)


[ 
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896997#comment-16896997
 ] 

Abhishek Dixit commented on BAHIR-213:
--

I see that Pull Request is not linked in the issue. Please find the pull 
request here:


[[BAHIR-213] Faster S3 file Source for Structured Streaming with 
SQS|[https://github.com/apache/bahir/pull/91]]

> Faster S3 file Source for Structured Streaming with SQS
> ---
>
> Key: BAHIR-213
> URL: https://issues.apache.org/jira/browse/BAHIR-213
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.4.0
>Reporter: Abhishek Dixit
>Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in 
> terms of costs and latency:
>  * *Latency:* Listing all the files in S3 buckets every microbatch can be 
> both slow and resource intensive.
>  * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
> new files written to S3 bucket without the need to list all the files every 
> microbatch.
> S3 buckets can be configured to send notification to an Amazon SQS Queue on 
> Object Create / Object Delete events. For details see AWS documentation here 
> [Configuring S3 Event 
> Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
>  
> Spark can leverage this to find new files written to S3 bucket by reading 
> notifications from SQS queue instead of listing files every microbatch.
> I hope to contribute changes proposed in [this pull 
> request|https://github.com/apache/spark/pull/24934] to Apache Bahir as 
> suggested by @[gaborgsomogyi|https://github.com/gaborgsomogyi]  
> [here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (BAHIR-213) Faster S3 file Source for Structured Streaming with SQS

2019-07-31 Thread Abhishek Dixit (JIRA)


[ 
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896994#comment-16896994
 ] 

Abhishek Dixit commented on BAHIR-213:
--

* *Issue of  FNFE when querying file:* I checked the behavior of the existing 
implementation of FileStreamSource for this scenario, i.e. if the file gets 
listed in the _getOffset_ phase but gets deleted before file read happens in 
_getBatch,_ Spark catches the FNFE and the following warning is thrown 
{code:java}
The directory $path was not found. Was it deleted very recently?{code}
I have gone with similar behavior in SqsSource. Let me know if you have any 
concerns about mimicking FileStreamSource Behavior for this scenario.

 

 * *Issue of File State Update:* In case of fo FileStreamSource, files are 
listed in _getOffset_ phase and if the contents of the same file change before 
the file is actually read in the _getBatch_ phase, then the initial content of 
the file is lost & not processed. Only the latest content of file available 
during _getBatch_ is processed & similar will be the behavior of SQS Source.

 * *Issue of Double Update:* On the other hand, if the file is processed in 
_getBatch_ and then later updated, FileStreamSource considers the file as 
seen/processed and doesn't read it again unless the file is aged. SQS Source 
also behaves, in the same way, i.e. if an SQS message pertaining to an already 
processed file comes again, it is simply ignored.

 *  

*Issue of Messages arriving out of order:* By default, the new/unprocessed file 
list obtained from SQS File Cache is sorted based on timestamp (of file 
updation) to avoid the issue of messages arriving out of order. In case some 
messages arrive very late, they will be processed in succeeding micro-batches. 
In any case, we can't guarantee to process files in the order of timestamp 
because of the distributed nature of SQS.

 

[~ste...@apache.org] Let me know if you have any concerns with any of the above 
points and want me to change the implementation in some way.

> Faster S3 file Source for Structured Streaming with SQS
> ---
>
> Key: BAHIR-213
> URL: https://issues.apache.org/jira/browse/BAHIR-213
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.4.0
>Reporter: Abhishek Dixit
>Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in 
> terms of costs and latency:
>  * *Latency:* Listing all the files in S3 buckets every microbatch can be 
> both slow and resource intensive.
>  * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
> new files written to S3 bucket without the need to list all the files every 
> microbatch.
> S3 buckets can be configured to send notification to an Amazon SQS Queue on 
> Object Create / Object Delete events. For details see AWS documentation here 
> [Configuring S3 Event 
> Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
>  
> Spark can leverage this to find new files written to S3 bucket by reading 
> notifications from SQS queue instead of listing files every microbatch.
> I hope to contribute changes proposed in [this pull 
> request|https://github.com/apache/spark/pull/24934] to Apache Bahir as 
> suggested by @[gaborgsomogyi|https://github.com/gaborgsomogyi]  
> [here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)