[jira] [Closed] (BAHIR-238) Incomplete Error Message on Failure to Create AmazonSQS Client

2020-07-18 Thread Abhishek Dixit (Jira)


 [ 
https://issues.apache.org/jira/browse/BAHIR-238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhishek Dixit closed BAHIR-238.

Fix Version/s: Spark-2.4.0
   Resolution: Duplicate

Raised another issue which has link to PR for this

> Incomplete Error Message on Failure to Create AmazonSQS Client
> --
>
> Key: BAHIR-238
> URL: https://issues.apache.org/jira/browse/BAHIR-238
> Project: Bahir
>  Issue Type: Bug
>  Components: Spark Structured Streaming Connectors
>Reporter: Abhishek Dixit
>Assignee: Abhishek Dixit
>Priority: Major
> Fix For: Spark-2.4.0
>
>
> Users have reported incomplete error message with null on failure to create 
> AmazonSQSClient. 
> {code:java}
> org.apache.spark.SparkException: Error occured while creating Amazon SQS 
> Client null
> at 
> org.apache.spark.sql.streaming.sqs.SqsClient.createSqsClient(SqsClient.scala:227)
> at org.apache.spark.sql.streaming.sqs.SqsClient.(SqsClient.scala:54)
> at org.apache.spark.sql.streaming.sqs.SqsSource.(SqsSource.scala:53)
> at 
> org.apache.spark.sql.streaming.sqs.SqsSourceProvider.createSource(SqsSourceProvider.scala:47)
> at 
> org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:255)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:88)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:85)
> at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:79)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:85)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:83)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:286)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:286)
> at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:71)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:285)
> at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
> at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
> at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
> at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:275)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:83)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:65)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:269)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
> {code}
> I'll create a PR for this fix.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BAHIR-239) Fix Bugs in SQSClient

2020-07-18 Thread Abhishek Dixit (Jira)


[ 
https://issues.apache.org/jira/browse/BAHIR-239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17160461#comment-17160461
 ] 

Abhishek Dixit commented on BAHIR-239:
--

PR for the issue: [https://github.com/apache/bahir/pull/99]

> Fix Bugs in SQSClient
> -
>
> Key: BAHIR-239
> URL: https://issues.apache.org/jira/browse/BAHIR-239
> Project: Bahir
>  Issue Type: Bug
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.4.0
>Reporter: Abhishek Dixit
>Assignee: Abhishek Dixit
>Priority: Major
>
> This Jira is to fix 2 bugs in SQSClient
> 1. Incomplete error message shows up when SQSClient fails to be created.
> {code:java}
> org.apache.spark.SparkException: Error occured while creating Amazon SQS 
> Client null
> {code}
> 2. AWS region is not honoured when authentication mode is keys.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BAHIR-239) Fix Bugs in SQSClient

2020-07-18 Thread Abhishek Dixit (Jira)


 [ 
https://issues.apache.org/jira/browse/BAHIR-239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhishek Dixit updated BAHIR-239:
-
Description: 
This Jira is to fix 2 bugs in SQSClient

1. Incomplete error message shows up when SQSClient fails to be created.
{code:java}
org.apache.spark.SparkException: Error occured while creating Amazon SQS Client 
null
{code}
2. AWS region is not honoured when authentication mode is keys.

 

  was:
This Jira is to fix 2 bugs in SQSClient

1. Incomplete error message shows up when SQSClient fails to be created.
{code:java}
// org.apache.spark.SparkException: Error occured while creating Amazon SQS 
Client null
{code}
2. AWS region is not honoured when authentication mode is keys.

 


> Fix Bugs in SQSClient
> -
>
> Key: BAHIR-239
> URL: https://issues.apache.org/jira/browse/BAHIR-239
> Project: Bahir
>  Issue Type: Bug
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.4.0
>Reporter: Abhishek Dixit
>Assignee: Abhishek Dixit
>Priority: Major
>
> This Jira is to fix 2 bugs in SQSClient
> 1. Incomplete error message shows up when SQSClient fails to be created.
> {code:java}
> org.apache.spark.SparkException: Error occured while creating Amazon SQS 
> Client null
> {code}
> 2. AWS region is not honoured when authentication mode is keys.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BAHIR-239) Fix Bugs in SQSClient

2020-07-18 Thread Abhishek Dixit (Jira)


[ 
https://issues.apache.org/jira/browse/BAHIR-239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17160451#comment-17160451
 ] 

Abhishek Dixit commented on BAHIR-239:
--

I'll raise PR for this issue.

> Fix Bugs in SQSClient
> -
>
> Key: BAHIR-239
> URL: https://issues.apache.org/jira/browse/BAHIR-239
> Project: Bahir
>  Issue Type: Bug
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.4.0
>Reporter: Abhishek Dixit
>Assignee: Abhishek Dixit
>Priority: Major
>
> This Jira is to fix 2 bugs in SQSClient
> 1. Incomplete error message shows up when SQSClient fails to be created.
> {code:java}
> // org.apache.spark.SparkException: Error occured while creating Amazon SQS 
> Client null
> {code}
> 2. AWS region is not honoured when authentication mode is keys.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BAHIR-239) Fix Bugs in SQSClient

2020-07-18 Thread Abhishek Dixit (Jira)
Abhishek Dixit created BAHIR-239:


 Summary: Fix Bugs in SQSClient
 Key: BAHIR-239
 URL: https://issues.apache.org/jira/browse/BAHIR-239
 Project: Bahir
  Issue Type: Bug
  Components: Spark Structured Streaming Connectors
Affects Versions: Spark-2.4.0
Reporter: Abhishek Dixit
Assignee: Abhishek Dixit


This Jira is to fix 2 bugs in SQSClient

1. Incomplete error message shows up when SQSClient fails to be created.
{code:java}
// org.apache.spark.SparkException: Error occured while creating Amazon SQS 
Client null
{code}
2. AWS region is not honoured when authentication mode is keys.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BAHIR-233) Add SNS message support for SQS structured streaming connector

2020-07-17 Thread Abhishek Dixit (Jira)


[ 
https://issues.apache.org/jira/browse/BAHIR-233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17159807#comment-17159807
 ] 

Abhishek Dixit commented on BAHIR-233:
--

[~Dmitry Gorbatsevich]

I had left a few comments on the PR. Can you please take a look at them?

> Add SNS message support for SQS structured streaming connector
> --
>
> Key: BAHIR-233
> URL: https://issues.apache.org/jira/browse/BAHIR-233
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Reporter: Dmitry Gorbatsevich
>Priority: Major
>
> h3. Motivation 
> Current implementation of SQS streaming connector handles the following 
> "route" of the s3 notification event: 
> 1. S3 -> SQS -> Spark 
> This approach works just fine until you need to have multiple listeners 
> (consumers) for the same S3 path. In case multiple applications require to 
> listen and process same S3 path the following approach is recommended: 
> 2. S3 -> SNS -> SQS -> Spark 
> In this case we can route messages from 1 SNS topic to multiple different SQS 
> queues. This enables an ability to listen same S3 path for multiple 
> applications Using approach #2, original S3 notification is wrapped into SNS 
> message and then delivered to the SQS queue. (link to the [AWS 
> docs|https://docs.aws.amazon.com/sns/latest/dg/sns-message-and-json-formats.html]
>  describing SNS message format) 
> To extract original S3 event from SNS message one need to look at "Message" 
> field in json document.  
> h4. Proposed approach 
> # Add option to the s3-sqs connector: "messageWrapper" 
> # It can be 'None' or 'SNS' 
> # Default value is 'None' 
>  
> In case if 'SNS' is specified – "unwrap" original s3 notification event from 
> SNS message and continue processing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BAHIR-238) Incomplete Error Message on Failure to Create AmazonSQS Client

2020-06-29 Thread Abhishek Dixit (Jira)
Abhishek Dixit created BAHIR-238:


 Summary: Incomplete Error Message on Failure to Create AmazonSQS 
Client
 Key: BAHIR-238
 URL: https://issues.apache.org/jira/browse/BAHIR-238
 Project: Bahir
  Issue Type: Bug
  Components: Spark Structured Streaming Connectors
Reporter: Abhishek Dixit
Assignee: Abhishek Dixit


Users have reported incomplete error message with null on failure to create 
AmazonSQSClient. 
{code:java}
org.apache.spark.SparkException: Error occured while creating Amazon SQS Client 
null
at 
org.apache.spark.sql.streaming.sqs.SqsClient.createSqsClient(SqsClient.scala:227)
at org.apache.spark.sql.streaming.sqs.SqsClient.(SqsClient.scala:54)
at org.apache.spark.sql.streaming.sqs.SqsSource.(SqsSource.scala:53)
at 
org.apache.spark.sql.streaming.sqs.SqsSourceProvider.createSource(SqsSourceProvider.scala:47)
at 
org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:255)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:88)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:85)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:79)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:85)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:83)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:286)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:286)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:71)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:285)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:275)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:83)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:65)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:269)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
{code}
I'll create a PR for this fix.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BAHIR-233) Add SNS message support for SQS structured streaming connector

2020-06-24 Thread Abhishek Dixit (Jira)


[ 
https://issues.apache.org/jira/browse/BAHIR-233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17144030#comment-17144030
 ] 

Abhishek Dixit commented on BAHIR-233:
--

This seems very useful. Thanks for contributing this. I'll review the PR 

> Add SNS message support for SQS structured streaming connector
> --
>
> Key: BAHIR-233
> URL: https://issues.apache.org/jira/browse/BAHIR-233
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Reporter: Dmitry Gorbatsevich
>Priority: Major
>
> h3. Motivation 
> Current implementation of SQS streaming connector handles the following 
> "route" of the s3 notification event: 
> 1. S3 -> SQS -> Spark 
> This approach works just fine until you need to have multiple listeners 
> (consumers) for the same S3 path. In case multiple applications require to 
> listen and process same S3 path the following approach is recommended: 
> 2. S3 -> SNS -> SQS -> Spark 
> In this case we can route messages from 1 SNS topic to multiple different SQS 
> queues. This enables an ability to listen same S3 path for multiple 
> applications Using approach #2, original S3 notification is wrapped into SNS 
> message and then delivered to the SQS queue. (link to the [AWS 
> docs|https://docs.aws.amazon.com/sns/latest/dg/sns-message-and-json-formats.html]
>  describing SNS message format) 
> To extract original S3 event from SNS message one need to look at "Message" 
> field in json document.  
> h4. Proposed approach 
> # Add option to the s3-sqs connector: "messageWrapper" 
> # It can be 'None' or 'SNS' 
> # Default value is 'None' 
>  
> In case if 'SNS' is specified – "unwrap" original s3 notification event from 
> SNS message and continue processing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BAHIR-222) Update Readme with details of SQL Streaming SQS connector

2019-12-29 Thread Abhishek Dixit (Jira)
Abhishek Dixit created BAHIR-222:


 Summary: Update Readme with details of SQL Streaming SQS connector
 Key: BAHIR-222
 URL: https://issues.apache.org/jira/browse/BAHIR-222
 Project: Bahir
  Issue Type: Task
  Components: Spark Structured Streaming Connectors
Affects Versions: Not Applicable
Reporter: Abhishek Dixit
Assignee: Abhishek Dixit
 Fix For: Not Applicable


Adding link to SQL Streaming SQS connector in BAHIR Readme.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BAHIR-222) Update Readme with details of SQL Streaming SQS connector

2019-12-29 Thread Abhishek Dixit (Jira)


[ 
https://issues.apache.org/jira/browse/BAHIR-222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17004733#comment-17004733
 ] 

Abhishek Dixit commented on BAHIR-222:
--

I'll raise a PR for this.

> Update Readme with details of SQL Streaming SQS connector
> -
>
> Key: BAHIR-222
> URL: https://issues.apache.org/jira/browse/BAHIR-222
> Project: Bahir
>  Issue Type: Task
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Not Applicable
>Reporter: Abhishek Dixit
>Assignee: Abhishek Dixit
>Priority: Major
> Fix For: Not Applicable
>
>
> Adding link to SQL Streaming SQS connector in BAHIR Readme.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BAHIR-213) Faster S3 file Source for Structured Streaming with SQS

2019-08-30 Thread Abhishek Dixit (Jira)


[ 
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16919860#comment-16919860
 ] 

Abhishek Dixit commented on BAHIR-213:
--

[~lresende] [~ste...@apache.org] This PR for new extension has been open for 
quite some time with no reviews or reviewers assigned. Can you please review or 
assign a reviewer?

Do I need to send an email to Bhair Mailing List for that?

> Faster S3 file Source for Structured Streaming with SQS
> ---
>
> Key: BAHIR-213
> URL: https://issues.apache.org/jira/browse/BAHIR-213
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.4.0
>Reporter: Abhishek Dixit
>Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in 
> terms of costs and latency:
>  * *Latency:* Listing all the files in S3 buckets every microbatch can be 
> both slow and resource intensive.
>  * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
> new files written to S3 bucket without the need to list all the files every 
> microbatch.
> S3 buckets can be configured to send notification to an Amazon SQS Queue on 
> Object Create / Object Delete events. For details see AWS documentation here 
> [Configuring S3 Event 
> Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
>  
> Spark can leverage this to find new files written to S3 bucket by reading 
> notifications from SQS queue instead of listing files every microbatch.
> I hope to contribute changes proposed in [this pull 
> request|https://github.com/apache/spark/pull/24934] to Apache Bahir as 
> suggested by [gaborgsomogyi|https://github.com/gaborgsomogyi]  
> [here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (BAHIR-217) Install of Oracle JDK 8 Failing in Travis CI

2019-08-29 Thread Abhishek Dixit (Jira)
Abhishek Dixit created BAHIR-217:


 Summary: Install of Oracle JDK 8 Failing in Travis CI
 Key: BAHIR-217
 URL: https://issues.apache.org/jira/browse/BAHIR-217
 Project: Bahir
  Issue Type: Bug
  Components: Build
Reporter: Abhishek Dixit


Install of Oracle JDK 8 Failing in Travis CI. As a result, build is failing for 
new pull requests.

We need to make a small fix in _ __ .travis.yml_ file as mentioned in the issue 
here:
https://travis-ci.community/t/install-of-oracle-jdk-8-failing/3038
We just need to add 
{code:java}
dist: trusty{code}
in the .travis.yml file as mentioned in the issue above.

I can raise a PR for this fix if required.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Comment Edited] (BAHIR-213) Faster S3 file Source for Structured Streaming with SQS

2019-07-31 Thread Abhishek Dixit (JIRA)


[ 
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896994#comment-16896994
 ] 

Abhishek Dixit edited comment on BAHIR-213 at 7/31/19 9:56 AM:
---

* *Issue of  FNFE when querying file :* I checked the behavior of the existing 
implementation of FileStreamSource for this scenario, i.e. if the file gets 
listed in the _getOffset_ phase but gets deleted before file read happens in 
_getBatch,_ Spark catches the FNFE and the following warning is thrown 
{code:java}
The directory $path was not found. Was it deleted very recently?{code}
I have gone with similar behavior in SqsSource. Let me know if you have any 
concerns about mimicking FileStreamSource Behavior for this scenario.

 * *Issue of File State Update:* In case of fo FileStreamSource, files are 
listed in _getOffset_ phase and if the contents of the same file change before 
the file is actually read in the _getBatch_ phase, then the initial content of 
the file is lost & not processed. Only the latest content of file available 
during _getBatch_ is processed & similar will be the behavior of SQS Source.

 * *Issue of Double Update:* On the other hand, if the file is processed in 
_getBatch_ and then later updated, FileStreamSource considers the file as 
seen/processed and doesn't read it again unless the file is aged. SQS Source 
also behaves, in the same way, i.e. if an SQS message pertaining to an already 
processed file comes again, it is simply ignored.

 * *Issue of Messages arriving out of order:* By default, the new/unprocessed 
file list obtained from SQS File Cache is sorted based on timestamp (of file 
updation) to avoid the issue of messages arriving out of order. In case some 
messages arrive very late, they will be processed in succeeding micro-batches. 
In any case, we can't guarantee to process files in the order of timestamp 
because of the distributed nature of SQS. 

 

[~ste...@apache.org] Let me know if you have any concerns with any of the above 
points and want me to change the implementation in some way.


was (Author: abhishekd0907):
* *Issue of  FNFE when querying file:* I checked the behavior of the existing 
implementation of FileStreamSource for this scenario, i.e. if the file gets 
listed in the _getOffset_ phase but gets deleted before file read happens in 
_getBatch,_ Spark catches the FNFE and the following warning is thrown 
{code:java}
The directory $path was not found. Was it deleted very recently?{code}
I have gone with similar behavior in SqsSource. Let me know if you have any 
concerns about mimicking FileStreamSource Behavior for this scenario.

 * *Issue of File State Update:* In case of fo FileStreamSource, files are 
listed in _getOffset_ phase and if the contents of the same file change before 
the file is actually read in the _getBatch_ phase, then the initial content of 
the file is lost & not processed. Only the latest content of file available 
during _getBatch_ is processed & similar will be the behavior of SQS Source.

 * *Issue of Double Update:* On the other hand, if the file is processed in 
_getBatch_ and then later updated, FileStreamSource considers the file as 
seen/processed and doesn't read it again unless the file is aged. SQS Source 
also behaves, in the same way, i.e. if an SQS message pertaining to an already 
processed file comes again, it is simply ignored.

 * *Issue of Messages arriving out of order:* By default, the new/unprocessed 
file list obtained from SQS File Cache is sorted based on timestamp (of file 
updation) to avoid the issue of messages arriving out of order. In case some 
messages arrive very late, they will be processed in succeeding micro-batches. 
In any case, we can't guarantee to process files in the order of timestamp 
because of the distributed nature of SQS. 

 

[~ste...@apache.org] Let me know if you have any concerns with any of the above 
points and want me to change the implementation in some way.

> Faster S3 file Source for Structured Streaming with SQS
> ---
>
> Key: BAHIR-213
> URL: https://issues.apache.org/jira/browse/BAHIR-213
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.4.0
>Reporter: Abhishek Dixit
>Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in 
> terms of costs and latency:
>  * *Latency:* Listing all the files in S3 buckets every microbatch can be 
> both slow and resource intensive.
>  * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
> new files written to S3 bucket without the need to list all the files every 
> microbatch.
> S3 buckets can be 

[jira] [Updated] (BAHIR-213) Faster S3 file Source for Structured Streaming with SQS

2019-07-31 Thread Abhishek Dixit (JIRA)


 [ 
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhishek Dixit updated BAHIR-213:
-
Description: 
Using FileStreamSource to read files from a S3 bucket has problems both in 
terms of costs and latency:
 * *Latency:* Listing all the files in S3 buckets every microbatch can be both 
slow and resource intensive.
 * *Costs:* Making List API requests to S3 every microbatch can be costly.

The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
new files written to S3 bucket without the need to list all the files every 
microbatch.

S3 buckets can be configured to send notification to an Amazon SQS Queue on 
Object Create / Object Delete events. For details see AWS documentation here 
[Configuring S3 Event 
Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
 

Spark can leverage this to find new files written to S3 bucket by reading 
notifications from SQS queue instead of listing files every microbatch.

I hope to contribute changes proposed in [this pull 
request|https://github.com/apache/spark/pull/24934] to Apache Bahir as 
suggested by [gaborgsomogyi|https://github.com/gaborgsomogyi]  
[here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]

  was:
Using FileStreamSource to read files from a S3 bucket has problems both in 
terms of costs and latency:
 * *Latency:* Listing all the files in S3 buckets every microbatch can be both 
slow and resource intensive.
 * *Costs:* Making List API requests to S3 every microbatch can be costly.

The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
new files written to S3 bucket without the need to list all the files every 
microbatch.

S3 buckets can be configured to send notification to an Amazon SQS Queue on 
Object Create / Object Delete events. For details see AWS documentation here 
[Configuring S3 Event 
Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
 

Spark can leverage this to find new files written to S3 bucket by reading 
notifications from SQS queue instead of listing files every microbatch.

I hope to contribute changes proposed in [this pull 
request|https://github.com/apache/spark/pull/24934] to Apache Bahir as 
suggested by @[gaborgsomogyi|https://github.com/gaborgsomogyi]  
[here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]


> Faster S3 file Source for Structured Streaming with SQS
> ---
>
> Key: BAHIR-213
> URL: https://issues.apache.org/jira/browse/BAHIR-213
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.4.0
>Reporter: Abhishek Dixit
>Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in 
> terms of costs and latency:
>  * *Latency:* Listing all the files in S3 buckets every microbatch can be 
> both slow and resource intensive.
>  * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
> new files written to S3 bucket without the need to list all the files every 
> microbatch.
> S3 buckets can be configured to send notification to an Amazon SQS Queue on 
> Object Create / Object Delete events. For details see AWS documentation here 
> [Configuring S3 Event 
> Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
>  
> Spark can leverage this to find new files written to S3 bucket by reading 
> notifications from SQS queue instead of listing files every microbatch.
> I hope to contribute changes proposed in [this pull 
> request|https://github.com/apache/spark/pull/24934] to Apache Bahir as 
> suggested by [gaborgsomogyi|https://github.com/gaborgsomogyi]  
> [here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BAHIR-213) Faster S3 file Source for Structured Streaming with SQS

2019-07-31 Thread Abhishek Dixit (JIRA)


[ 
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896994#comment-16896994
 ] 

Abhishek Dixit edited comment on BAHIR-213 at 7/31/19 9:51 AM:
---

* *Issue of  FNFE when querying file:* I checked the behavior of the existing 
implementation of FileStreamSource for this scenario, i.e. if the file gets 
listed in the _getOffset_ phase but gets deleted before file read happens in 
_getBatch,_ Spark catches the FNFE and the following warning is thrown 
{code:java}
The directory $path was not found. Was it deleted very recently?{code}
I have gone with similar behavior in SqsSource. Let me know if you have any 
concerns about mimicking FileStreamSource Behavior for this scenario.

 * *Issue of File State Update:* In case of fo FileStreamSource, files are 
listed in _getOffset_ phase and if the contents of the same file change before 
the file is actually read in the _getBatch_ phase, then the initial content of 
the file is lost & not processed. Only the latest content of file available 
during _getBatch_ is processed & similar will be the behavior of SQS Source.

 * *Issue of Double Update:* On the other hand, if the file is processed in 
_getBatch_ and then later updated, FileStreamSource considers the file as 
seen/processed and doesn't read it again unless the file is aged. SQS Source 
also behaves, in the same way, i.e. if an SQS message pertaining to an already 
processed file comes again, it is simply ignored.

 * *Issue of Messages arriving out of order:* By default, the new/unprocessed 
file list obtained from SQS File Cache is sorted based on timestamp (of file 
updation) to avoid the issue of messages arriving out of order. In case some 
messages arrive very late, they will be processed in succeeding micro-batches. 
In any case, we can't guarantee to process files in the order of timestamp 
because of the distributed nature of SQS. 

 

[~ste...@apache.org] Let me know if you have any concerns with any of the above 
points and want me to change the implementation in some way.


was (Author: abhishekd0907):
* *Issue of  FNFE when querying [file:*|file:///*] I checked the behavior of 
the existing implementation of FileStreamSource for this scenario, i.e. if the 
file gets listed in the _getOffset_ phase but gets deleted before file read 
happens in _getBatch,_ Spark catches the FNFE and the following warning is 
thrown 
{code:java}
The directory $path was not found. Was it deleted very recently?{code}
I have gone with similar behavior in SqsSource. Let me know if you have any 
concerns about mimicking FileStreamSource Behavior for this scenario.

 
 * *Issue of File State Update:* In case of fo FileStreamSource, files are 
listed in _getOffset_ phase and if the contents of the same file change before 
the file is actually read in the _getBatch_ phase, then the initial content of 
the file is lost & not processed. Only the latest content of file available 
during _getBatch_ is processed & similar will be the behavior of SQS Source.

 * *Issue of Double Update:* On the other hand, if the file is processed in 
_getBatch_ and then later updated, FileStreamSource considers the file as 
seen/processed and doesn't read it again unless the file is aged. SQS Source 
also behaves, in the same way, i.e. if an SQS message pertaining to an already 
processed file comes again, it is simply ignored.

 
 * *Issue of Messages arriving out of order:* By default, the new/unprocessed 
file list obtained from SQS File Cache is sorted based on timestamp (of file 
updation) to avoid the issue of messages arriving out of order. In case some 
messages arrive very late, they will be processed in succeeding micro-batches. 
In any case, we can't guarantee to process files in the order of timestamp 
because of the distributed nature of SQS. 

[~ste...@apache.org] Let me know if you have any concerns with any of the above 
points and want me to change the implementation in some way.

> Faster S3 file Source for Structured Streaming with SQS
> ---
>
> Key: BAHIR-213
> URL: https://issues.apache.org/jira/browse/BAHIR-213
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.4.0
>Reporter: Abhishek Dixit
>Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in 
> terms of costs and latency:
>  * *Latency:* Listing all the files in S3 buckets every microbatch can be 
> both slow and resource intensive.
>  * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
> new files written to S3 bucket without the need to list all the files every 
> microbatch.
> S3 

[jira] [Comment Edited] (BAHIR-213) Faster S3 file Source for Structured Streaming with SQS

2019-07-31 Thread Abhishek Dixit (JIRA)


[ 
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896994#comment-16896994
 ] 

Abhishek Dixit edited comment on BAHIR-213 at 7/31/19 9:50 AM:
---

* *Issue of  FNFE when querying [file:*|file:///*] I checked the behavior of 
the existing implementation of FileStreamSource for this scenario, i.e. if the 
file gets listed in the _getOffset_ phase but gets deleted before file read 
happens in _getBatch,_ Spark catches the FNFE and the following warning is 
thrown 
{code:java}
The directory $path was not found. Was it deleted very recently?{code}
I have gone with similar behavior in SqsSource. Let me know if you have any 
concerns about mimicking FileStreamSource Behavior for this scenario.

 
 * *Issue of File State Update:* In case of fo FileStreamSource, files are 
listed in _getOffset_ phase and if the contents of the same file change before 
the file is actually read in the _getBatch_ phase, then the initial content of 
the file is lost & not processed. Only the latest content of file available 
during _getBatch_ is processed & similar will be the behavior of SQS Source.

 * *Issue of Double Update:* On the other hand, if the file is processed in 
_getBatch_ and then later updated, FileStreamSource considers the file as 
seen/processed and doesn't read it again unless the file is aged. SQS Source 
also behaves, in the same way, i.e. if an SQS message pertaining to an already 
processed file comes again, it is simply ignored.

 
 * *Issue of Messages arriving out of order:* By default, the new/unprocessed 
file list obtained from SQS File Cache is sorted based on timestamp (of file 
updation) to avoid the issue of messages arriving out of order. In case some 
messages arrive very late, they will be processed in succeeding micro-batches. 
In any case, we can't guarantee to process files in the order of timestamp 
because of the distributed nature of SQS. 

[~ste...@apache.org] Let me know if you have any concerns with any of the above 
points and want me to change the implementation in some way.


was (Author: abhishekd0907):
* *Issue of  FNFE when querying [file:*|file:///*] I checked the behavior of 
the existing implementation of FileStreamSource for this scenario, i.e. if the 
file gets listed in the _getOffset_ phase but gets deleted before file read 
happens in _getBatch,_ Spark catches the FNFE and the following warning is 
thrown 
{code:java}
The directory $path was not found. Was it deleted very recently?{code}
I have gone with similar behavior in SqsSource. Let me know if you have any 
concerns about mimicking FileStreamSource Behavior for this scenario.

 
 * *Issue of File State Update:* In case of fo FileStreamSource, files are 
listed in _getOffset_ phase and if the contents of the same file change before 
the file is actually read in the _getBatch_ phase, then the initial content of 
the file is lost & not processed. Only the latest content of file available 
during _getBatch_ is processed & similar will be the behavior of SQS Source.

 * *Issue of Double Update:* On the other hand, if the file is processed in 
_getBatch_ and then later updated, FileStreamSource considers the file as 
seen/processed and doesn't read it again unless the file is aged. SQS Source 
also behaves, in the same way, i.e. if an SQS message pertaining to an already 
processed file comes again, it is simply ignored.

 

*Issue of Messages arriving out of order:* By default, the new/unprocessed file 
list obtained from SQS File Cache is sorted based on timestamp (of file 
updation) to avoid the issue of messages arriving out of order. In case some 
messages arrive very late, they will be processed in succeeding micro-batches. 
In any case, we can't guarantee to process files in the order of timestamp 
because of the distributed nature of SQS. 

[~ste...@apache.org] Let me know if you have any concerns with any of the above 
points and want me to change the implementation in some way.

> Faster S3 file Source for Structured Streaming with SQS
> ---
>
> Key: BAHIR-213
> URL: https://issues.apache.org/jira/browse/BAHIR-213
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.4.0
>Reporter: Abhishek Dixit
>Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in 
> terms of costs and latency:
>  * *Latency:* Listing all the files in S3 buckets every microbatch can be 
> both slow and resource intensive.
>  * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
> new files written to S3 bucket without the need to list all the files every 
> 

[jira] [Comment Edited] (BAHIR-213) Faster S3 file Source for Structured Streaming with SQS

2019-07-31 Thread Abhishek Dixit (JIRA)


[ 
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896997#comment-16896997
 ] 

Abhishek Dixit edited comment on BAHIR-213 at 7/31/19 9:49 AM:
---

I see that Pull Request is not linked in the issue. Please find the pull 
request here:

[Faster S3 file Source for Structured Streaming with 
SQS|https://github.com/apache/bahir/pull/91]

cc [~ste...@apache.org]


was (Author: abhishekd0907):
I see that Pull Request is not linked in the issue. Please find the pull 
request here:

[Faster S3 file Source for Structured Streaming with 
SQS|https://github.com/apache/bahir/pull/91]

 

cc [~ste...@apache.org]

> Faster S3 file Source for Structured Streaming with SQS
> ---
>
> Key: BAHIR-213
> URL: https://issues.apache.org/jira/browse/BAHIR-213
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.4.0
>Reporter: Abhishek Dixit
>Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in 
> terms of costs and latency:
>  * *Latency:* Listing all the files in S3 buckets every microbatch can be 
> both slow and resource intensive.
>  * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
> new files written to S3 bucket without the need to list all the files every 
> microbatch.
> S3 buckets can be configured to send notification to an Amazon SQS Queue on 
> Object Create / Object Delete events. For details see AWS documentation here 
> [Configuring S3 Event 
> Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
>  
> Spark can leverage this to find new files written to S3 bucket by reading 
> notifications from SQS queue instead of listing files every microbatch.
> I hope to contribute changes proposed in [this pull 
> request|https://github.com/apache/spark/pull/24934] to Apache Bahir as 
> suggested by @[gaborgsomogyi|https://github.com/gaborgsomogyi]  
> [here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BAHIR-213) Faster S3 file Source for Structured Streaming with SQS

2019-07-31 Thread Abhishek Dixit (JIRA)


[ 
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896997#comment-16896997
 ] 

Abhishek Dixit edited comment on BAHIR-213 at 7/31/19 9:49 AM:
---

I see that Pull Request is not linked in the issue. Please find the pull 
request here:

[Faster S3 file Source for Structured Streaming with 
SQS|https://github.com/apache/bahir/pull/91]

 

cc [~ste...@apache.org]


was (Author: abhishekd0907):
I see that Pull Request is not linked in the issue. Please find the pull 
request here:

[Faster S3 file Source for Structured Streaming with 
SQS|https://github.com/apache/bahir/pull/91]

> Faster S3 file Source for Structured Streaming with SQS
> ---
>
> Key: BAHIR-213
> URL: https://issues.apache.org/jira/browse/BAHIR-213
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.4.0
>Reporter: Abhishek Dixit
>Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in 
> terms of costs and latency:
>  * *Latency:* Listing all the files in S3 buckets every microbatch can be 
> both slow and resource intensive.
>  * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
> new files written to S3 bucket without the need to list all the files every 
> microbatch.
> S3 buckets can be configured to send notification to an Amazon SQS Queue on 
> Object Create / Object Delete events. For details see AWS documentation here 
> [Configuring S3 Event 
> Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
>  
> Spark can leverage this to find new files written to S3 bucket by reading 
> notifications from SQS queue instead of listing files every microbatch.
> I hope to contribute changes proposed in [this pull 
> request|https://github.com/apache/spark/pull/24934] to Apache Bahir as 
> suggested by @[gaborgsomogyi|https://github.com/gaborgsomogyi]  
> [here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BAHIR-213) Faster S3 file Source for Structured Streaming with SQS

2019-07-31 Thread Abhishek Dixit (JIRA)


[ 
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896994#comment-16896994
 ] 

Abhishek Dixit edited comment on BAHIR-213 at 7/31/19 9:49 AM:
---

* *Issue of  FNFE when querying [file:*|file:///*] I checked the behavior of 
the existing implementation of FileStreamSource for this scenario, i.e. if the 
file gets listed in the _getOffset_ phase but gets deleted before file read 
happens in _getBatch,_ Spark catches the FNFE and the following warning is 
thrown 
{code:java}
The directory $path was not found. Was it deleted very recently?{code}
I have gone with similar behavior in SqsSource. Let me know if you have any 
concerns about mimicking FileStreamSource Behavior for this scenario.

 
 * *Issue of File State Update:* In case of fo FileStreamSource, files are 
listed in _getOffset_ phase and if the contents of the same file change before 
the file is actually read in the _getBatch_ phase, then the initial content of 
the file is lost & not processed. Only the latest content of file available 
during _getBatch_ is processed & similar will be the behavior of SQS Source.

 * *Issue of Double Update:* On the other hand, if the file is processed in 
_getBatch_ and then later updated, FileStreamSource considers the file as 
seen/processed and doesn't read it again unless the file is aged. SQS Source 
also behaves, in the same way, i.e. if an SQS message pertaining to an already 
processed file comes again, it is simply ignored.

 

*Issue of Messages arriving out of order:* By default, the new/unprocessed file 
list obtained from SQS File Cache is sorted based on timestamp (of file 
updation) to avoid the issue of messages arriving out of order. In case some 
messages arrive very late, they will be processed in succeeding micro-batches. 
In any case, we can't guarantee to process files in the order of timestamp 
because of the distributed nature of SQS. 

[~ste...@apache.org] Let me know if you have any concerns with any of the above 
points and want me to change the implementation in some way.


was (Author: abhishekd0907):
* *Issue of  FNFE when querying file:* I checked the behavior of the existing 
implementation of FileStreamSource for this scenario, i.e. if the file gets 
listed in the _getOffset_ phase but gets deleted before file read happens in 
_getBatch,_ Spark catches the FNFE and the following warning is thrown 
{code:java}
The directory $path was not found. Was it deleted very recently?{code}
I have gone with similar behavior in SqsSource. Let me know if you have any 
concerns about mimicking FileStreamSource Behavior for this scenario.

 

 * *Issue of File State Update:* In case of fo FileStreamSource, files are 
listed in _getOffset_ phase and if the contents of the same file change before 
the file is actually read in the _getBatch_ phase, then the initial content of 
the file is lost & not processed. Only the latest content of file available 
during _getBatch_ is processed & similar will be the behavior of SQS Source.

 * *Issue of Double Update:* On the other hand, if the file is processed in 
_getBatch_ and then later updated, FileStreamSource considers the file as 
seen/processed and doesn't read it again unless the file is aged. SQS Source 
also behaves, in the same way, i.e. if an SQS message pertaining to an already 
processed file comes again, it is simply ignored.

 *  

*Issue of Messages arriving out of order:* By default, the new/unprocessed file 
list obtained from SQS File Cache is sorted based on timestamp (of file 
updation) to avoid the issue of messages arriving out of order. In case some 
messages arrive very late, they will be processed in succeeding micro-batches. 
In any case, we can't guarantee to process files in the order of timestamp 
because of the distributed nature of SQS.

 

[~ste...@apache.org] Let me know if you have any concerns with any of the above 
points and want me to change the implementation in some way.

> Faster S3 file Source for Structured Streaming with SQS
> ---
>
> Key: BAHIR-213
> URL: https://issues.apache.org/jira/browse/BAHIR-213
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.4.0
>Reporter: Abhishek Dixit
>Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in 
> terms of costs and latency:
>  * *Latency:* Listing all the files in S3 buckets every microbatch can be 
> both slow and resource intensive.
>  * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
> new files written to S3 bucket without the need to list all the files every 
> microbatch.
> 

[jira] [Comment Edited] (BAHIR-213) Faster S3 file Source for Structured Streaming with SQS

2019-07-31 Thread Abhishek Dixit (JIRA)


[ 
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896997#comment-16896997
 ] 

Abhishek Dixit edited comment on BAHIR-213 at 7/31/19 9:48 AM:
---

I see that Pull Request is not linked in the issue. Please find the pull 
request here:

[Faster S3 file Source for Structured Streaming with 
SQS|https://github.com/apache/bahir/pull/91]


was (Author: abhishekd0907):
I see that Pull Request is not linked in the issue. Please find the pull 
request here:


[[BAHIR-213] Faster S3 file Source for Structured Streaming with 
SQS|[https://github.com/apache/bahir/pull/91]]

> Faster S3 file Source for Structured Streaming with SQS
> ---
>
> Key: BAHIR-213
> URL: https://issues.apache.org/jira/browse/BAHIR-213
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.4.0
>Reporter: Abhishek Dixit
>Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in 
> terms of costs and latency:
>  * *Latency:* Listing all the files in S3 buckets every microbatch can be 
> both slow and resource intensive.
>  * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
> new files written to S3 bucket without the need to list all the files every 
> microbatch.
> S3 buckets can be configured to send notification to an Amazon SQS Queue on 
> Object Create / Object Delete events. For details see AWS documentation here 
> [Configuring S3 Event 
> Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
>  
> Spark can leverage this to find new files written to S3 bucket by reading 
> notifications from SQS queue instead of listing files every microbatch.
> I hope to contribute changes proposed in [this pull 
> request|https://github.com/apache/spark/pull/24934] to Apache Bahir as 
> suggested by @[gaborgsomogyi|https://github.com/gaborgsomogyi]  
> [here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (BAHIR-213) Faster S3 file Source for Structured Streaming with SQS

2019-07-31 Thread Abhishek Dixit (JIRA)


[ 
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896997#comment-16896997
 ] 

Abhishek Dixit commented on BAHIR-213:
--

I see that Pull Request is not linked in the issue. Please find the pull 
request here:


[[BAHIR-213] Faster S3 file Source for Structured Streaming with 
SQS|[https://github.com/apache/bahir/pull/91]]

> Faster S3 file Source for Structured Streaming with SQS
> ---
>
> Key: BAHIR-213
> URL: https://issues.apache.org/jira/browse/BAHIR-213
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.4.0
>Reporter: Abhishek Dixit
>Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in 
> terms of costs and latency:
>  * *Latency:* Listing all the files in S3 buckets every microbatch can be 
> both slow and resource intensive.
>  * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
> new files written to S3 bucket without the need to list all the files every 
> microbatch.
> S3 buckets can be configured to send notification to an Amazon SQS Queue on 
> Object Create / Object Delete events. For details see AWS documentation here 
> [Configuring S3 Event 
> Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
>  
> Spark can leverage this to find new files written to S3 bucket by reading 
> notifications from SQS queue instead of listing files every microbatch.
> I hope to contribute changes proposed in [this pull 
> request|https://github.com/apache/spark/pull/24934] to Apache Bahir as 
> suggested by @[gaborgsomogyi|https://github.com/gaborgsomogyi]  
> [here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (BAHIR-213) Faster S3 file Source for Structured Streaming with SQS

2019-07-31 Thread Abhishek Dixit (JIRA)


[ 
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896994#comment-16896994
 ] 

Abhishek Dixit commented on BAHIR-213:
--

* *Issue of  FNFE when querying file:* I checked the behavior of the existing 
implementation of FileStreamSource for this scenario, i.e. if the file gets 
listed in the _getOffset_ phase but gets deleted before file read happens in 
_getBatch,_ Spark catches the FNFE and the following warning is thrown 
{code:java}
The directory $path was not found. Was it deleted very recently?{code}
I have gone with similar behavior in SqsSource. Let me know if you have any 
concerns about mimicking FileStreamSource Behavior for this scenario.

 

 * *Issue of File State Update:* In case of fo FileStreamSource, files are 
listed in _getOffset_ phase and if the contents of the same file change before 
the file is actually read in the _getBatch_ phase, then the initial content of 
the file is lost & not processed. Only the latest content of file available 
during _getBatch_ is processed & similar will be the behavior of SQS Source.

 * *Issue of Double Update:* On the other hand, if the file is processed in 
_getBatch_ and then later updated, FileStreamSource considers the file as 
seen/processed and doesn't read it again unless the file is aged. SQS Source 
also behaves, in the same way, i.e. if an SQS message pertaining to an already 
processed file comes again, it is simply ignored.

 *  

*Issue of Messages arriving out of order:* By default, the new/unprocessed file 
list obtained from SQS File Cache is sorted based on timestamp (of file 
updation) to avoid the issue of messages arriving out of order. In case some 
messages arrive very late, they will be processed in succeeding micro-batches. 
In any case, we can't guarantee to process files in the order of timestamp 
because of the distributed nature of SQS.

 

[~ste...@apache.org] Let me know if you have any concerns with any of the above 
points and want me to change the implementation in some way.

> Faster S3 file Source for Structured Streaming with SQS
> ---
>
> Key: BAHIR-213
> URL: https://issues.apache.org/jira/browse/BAHIR-213
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.4.0
>Reporter: Abhishek Dixit
>Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in 
> terms of costs and latency:
>  * *Latency:* Listing all the files in S3 buckets every microbatch can be 
> both slow and resource intensive.
>  * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
> new files written to S3 bucket without the need to list all the files every 
> microbatch.
> S3 buckets can be configured to send notification to an Amazon SQS Queue on 
> Object Create / Object Delete events. For details see AWS documentation here 
> [Configuring S3 Event 
> Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
>  
> Spark can leverage this to find new files written to S3 bucket by reading 
> notifications from SQS queue instead of listing files every microbatch.
> I hope to contribute changes proposed in [this pull 
> request|https://github.com/apache/spark/pull/24934] to Apache Bahir as 
> suggested by @[gaborgsomogyi|https://github.com/gaborgsomogyi]  
> [here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (BAHIR-213) Faster S3 file Source for Structured Streaming with SQS

2019-07-26 Thread Abhishek Dixit (JIRA)


 [ 
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhishek Dixit updated BAHIR-213:
-
Affects Version/s: (was: Spark-2.3.0)

> Faster S3 file Source for Structured Streaming with SQS
> ---
>
> Key: BAHIR-213
> URL: https://issues.apache.org/jira/browse/BAHIR-213
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.4.0
>Reporter: Abhishek Dixit
>Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in 
> terms of costs and latency:
>  * *Latency:* Listing all the files in S3 buckets every microbatch can be 
> both slow and resource intensive.
>  * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
> new files written to S3 bucket without the need to list all the files every 
> microbatch.
> S3 buckets can be configured to send notification to an Amazon SQS Queue on 
> Object Create / Object Delete events. For details see AWS documentation here 
> [Configuring S3 Event 
> Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
>  
> Spark can leverage this to find new files written to S3 bucket by reading 
> notifications from SQS queue instead of listing files every microbatch.
> I hope to contribute changes proposed in [this pull 
> request|https://github.com/apache/spark/pull/24934] to Apache Bahir as 
> suggested by @[gaborgsomogyi|https://github.com/gaborgsomogyi]  
> [here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (BAHIR-213) Faster S3 file Source for Structured Streaming with SQS

2019-07-24 Thread Abhishek Dixit (JIRA)


 [ 
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhishek Dixit updated BAHIR-213:
-
Description: 
Using FileStreamSource to read files from a S3 bucket has problems both in 
terms of costs and latency:
 * *Latency:* Listing all the files in S3 buckets every microbatch can be both 
slow and resource intensive.
 * *Costs:* Making List API requests to S3 every microbatch can be costly.

The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
new files written to S3 bucket without the need to list all the files every 
microbatch.

S3 buckets can be configured to send notification to an Amazon SQS Queue on 
Object Create / Object Delete events. For details see AWS documentation here 
[Configuring S3 Event 
Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
 

Spark can leverage this to find new files written to S3 bucket by reading 
notifications from SQS queue instead of listing files every microbatch.

I hope to contribute changes proposed in [this pull 
request|https://github.com/apache/spark/pull/24934] to Apache Bahir as 
suggested by @[gaborgsomogyi|https://github.com/gaborgsomogyi]  
[here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]

  was:
Using FileStreamSource to read files from a S3 bucket has problems both in 
terms of costs and latency:
 * *Latency:* Listing all the files in S3 buckets every microbatch can be both 
slow and resource intensive.
 * *Costs:* Making List API requests to S3 every microbatch can be costly.

The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
new files written to S3 bucket without the need to list all the files every 
microbatch.

S3 buckets can be configured to send notification to an Amazon SQS Queue on 
Object Create / Object Delete events. For details see AWS documentation here 
[Configuring S3 Event 
Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
 

Spark can leverage this to find new files written to S3 bucket by reading 
notifications from SQS queue instead of listing files every microbatch.

 

I hope to contribute changes proposed in [this pull 
request|https://github.com/apache/spark/pull/24934] to Apache Bahir as 
suggested by @[gaborgsomogyi|https://github.com/gaborgsomogyi]  
[here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]

 

 


> Faster S3 file Source for Structured Streaming with SQS
> ---
>
> Key: BAHIR-213
> URL: https://issues.apache.org/jira/browse/BAHIR-213
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.3.0, Spark-2.4.0
>Reporter: Abhishek Dixit
>Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in 
> terms of costs and latency:
>  * *Latency:* Listing all the files in S3 buckets every microbatch can be 
> both slow and resource intensive.
>  * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
> new files written to S3 bucket without the need to list all the files every 
> microbatch.
> S3 buckets can be configured to send notification to an Amazon SQS Queue on 
> Object Create / Object Delete events. For details see AWS documentation here 
> [Configuring S3 Event 
> Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
>  
> Spark can leverage this to find new files written to S3 bucket by reading 
> notifications from SQS queue instead of listing files every microbatch.
> I hope to contribute changes proposed in [this pull 
> request|https://github.com/apache/spark/pull/24934] to Apache Bahir as 
> suggested by @[gaborgsomogyi|https://github.com/gaborgsomogyi]  
> [here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (BAHIR-213) Faster S3 file Source for Structured Streaming with SQS

2019-07-24 Thread Abhishek Dixit (JIRA)


 [ 
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhishek Dixit updated BAHIR-213:
-
Description: 
Using FileStreamSource to read files from a S3 bucket has problems both in 
terms of costs and latency:
 * *Latency:* Listing all the files in S3 buckets every microbatch can be both 
slow and resource intensive.
 * *Costs:* Making List API requests to S3 every microbatch can be costly.

The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
new files written to S3 bucket without the need to list all the files every 
microbatch.

S3 buckets can be configured to send notification to an Amazon SQS Queue on 
Object Create / Object Delete events. For details see AWS documentation here 
[Configuring S3 Event 
Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
 

Spark can leverage this to find new files written to S3 bucket by reading 
notifications from SQS queue instead of listing files every microbatch.

 

I hope to contribute changes proposed in [this pull 
request|https://github.com/apache/spark/pull/24934] to Apache Bahir as 
suggested by @[gaborgsomogyi|https://github.com/gaborgsomogyi]  
[here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]

 

 

  was:
Using FileStreamSource to read files from a S3 bucket has problems both in 
terms of costs and latency:
 * *Latency:* Listing all the files in S3 buckets every microbatch can be both 
slow and resource intensive.
 * *Costs:* Making List API requests to S3 every microbatch can be costly.

The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
new files written to S3 bucket without the need to list all the files every 
microbatch.

S3 buckets can be configured to send notification to an Amazon SQS Queue on 
Object Create / Object Delete events. For details see AWS documentation here 
[Configuring S3 Event 
Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
 

Spark can leverage this to find new files written to S3 bucket by reading 
notifications from SQS queue instead of listing files every microbatch.

 

I hope to contribute [this PR|[https://github.com/apache/spark/pull/24934]] to 
Apache Bahir as suggested by @[gaborgsomogyi|https://github.com/gaborgsomogyi]  
[here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]

 

 


> Faster S3 file Source for Structured Streaming with SQS
> ---
>
> Key: BAHIR-213
> URL: https://issues.apache.org/jira/browse/BAHIR-213
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.3.0, Spark-2.4.0
>Reporter: Abhishek Dixit
>Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in 
> terms of costs and latency:
>  * *Latency:* Listing all the files in S3 buckets every microbatch can be 
> both slow and resource intensive.
>  * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
> new files written to S3 bucket without the need to list all the files every 
> microbatch.
> S3 buckets can be configured to send notification to an Amazon SQS Queue on 
> Object Create / Object Delete events. For details see AWS documentation here 
> [Configuring S3 Event 
> Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
>  
> Spark can leverage this to find new files written to S3 bucket by reading 
> notifications from SQS queue instead of listing files every microbatch.
>  
> I hope to contribute changes proposed in [this pull 
> request|https://github.com/apache/spark/pull/24934] to Apache Bahir as 
> suggested by @[gaborgsomogyi|https://github.com/gaborgsomogyi]  
> [here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (BAHIR-213) Faster S3 file Source for Structured Streaming with SQS

2019-07-24 Thread Abhishek Dixit (JIRA)
Abhishek Dixit created BAHIR-213:


 Summary: Faster S3 file Source for Structured Streaming with SQS
 Key: BAHIR-213
 URL: https://issues.apache.org/jira/browse/BAHIR-213
 Project: Bahir
  Issue Type: New Feature
  Components: Spark Structured Streaming Connectors
Affects Versions: Spark-2.3.0, Spark-2.4.0
Reporter: Abhishek Dixit


Using FileStreamSource to read files from a S3 bucket has problems both in 
terms of costs and latency:
 * *Latency:* Listing all the files in S3 buckets every microbatch can be both 
slow and resource intensive.
 * *Costs:* Making List API requests to S3 every microbatch can be costly.

The solution is to use Amazon Simple Queue Service (SQS) which lets you find 
new files written to S3 bucket without the need to list all the files every 
microbatch.

S3 buckets can be configured to send notification to an Amazon SQS Queue on 
Object Create / Object Delete events. For details see AWS documentation here 
[Configuring S3 Event 
Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
 

Spark can leverage this to find new files written to S3 bucket by reading 
notifications from SQS queue instead of listing files every microbatch.

 

I hope to contribute [this PR|[https://github.com/apache/spark/pull/24934]] to 
Apache Bahir as suggested by @[gaborgsomogyi|https://github.com/gaborgsomogyi]  
[here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]

 

 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)