[GitHub] bahir pull request #57: [BAHIR-128][WIP] fix failing sql-cloudant test
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/57#discussion_r157478713 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala --- @@ -67,7 +67,8 @@ class CloudantChangesConfig(protocol: String, host: String, dbName: String, } def getChangesReceiverUrl: String = { -var url = dbUrl + "/" + defaultIndex + "?include_docs=true=continuous=" + timeout +var url = dbUrl + "/" + defaultIndex + "?include_docs=true=normal" + + "_interval=1=" + timeout --- End diff -- WDYT about making the `seq_interval` the `bulkSize` instead of hard-coding? ---
[GitHub] bahir pull request #57: [BAHIR-128][WIP] fix failing sql-cloudant test
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/57#discussion_r157479639 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala --- @@ -39,56 +37,38 @@ class ChangesReceiver(config: CloudantChangesConfig) } private def receive(): Unit = { -// Get total number of docs in database using _all_docs endpoint -val limit = new JsonStoreDataAccess(config) - .getTotalRows(config.getTotalUrl, queryUsed = false) - -// Get continuous _changes url +// Get normal _changes url val url = config.getChangesReceiverUrl.toString val selector: String = { "{\"selector\":" + config.getSelector + "}" } -var count = 0 +// var count = 0 --- End diff -- delete? ---
[GitHub] bahir pull request #57: [BAHIR-128][WIP] fix failing sql-cloudant test
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/57#discussion_r157480164 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala --- @@ -125,7 +125,7 @@ class DefaultSource extends RelationProvider /* Create a streaming context to handle transforming docs in * larger databases into Spark datasets */ - val ssc = new StreamingContext(sqlContext.sparkContext, Seconds(10)) + val ssc = new StreamingContext(sqlContext.sparkContext, Seconds(8)) --- End diff -- I'm amazed this parameter was hard-coded as it seems to be a fairly critical tuning parameter for streaming. I guess this is one for another PR. ---
[GitHub] bahir pull request #57: [BAHIR-128][WIP] fix failing sql-cloudant test
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/57#discussion_r157484722 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala --- @@ -39,56 +37,38 @@ class ChangesReceiver(config: CloudantChangesConfig) } private def receive(): Unit = { -// Get total number of docs in database using _all_docs endpoint -val limit = new JsonStoreDataAccess(config) - .getTotalRows(config.getTotalUrl, queryUsed = false) - -// Get continuous _changes url +// Get normal _changes url --- End diff -- I'm a bit confused about this change. Since Spark Streaming is the basis for "real-time" or "continuous applications" doesn't this need to keep listening to the changes feed to wait for more changes? ---
[GitHub] bahir pull request #57: [BAHIR-128][WIP] fix failing sql-cloudant test
Github user emlaver commented on a diff in the pull request: https://github.com/apache/bahir/pull/57#discussion_r157533814 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala --- @@ -39,56 +37,38 @@ class ChangesReceiver(config: CloudantChangesConfig) } private def receive(): Unit = { -// Get total number of docs in database using _all_docs endpoint -val limit = new JsonStoreDataAccess(config) - .getTotalRows(config.getTotalUrl, queryUsed = false) - -// Get continuous _changes url +// Get normal _changes url --- End diff -- For our internal implementation, we (myself and Mayya) wanted the user to have a snapshot of data to load into Spark. For that to be possible, we decided to use `continuous` style feed with a doc limit. With the new _changes implementation from Mike's project, the `normal` feed is stable and works as expected. I've also lowered the amount of requests/load time by removing the HTTP request for the doc limit since it's not needed with `normal` style _changes feed. To work with data in "real-time", you can use `CloudantReciever` which creates an eternal changes feed within the Spark Streaming context. ---
[GitHub] bahir pull request #57: [BAHIR-128][WIP] fix failing sql-cloudant test
Github user emlaver commented on a diff in the pull request: https://github.com/apache/bahir/pull/57#discussion_r157552982 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala --- @@ -39,56 +37,38 @@ class ChangesReceiver(config: CloudantChangesConfig) } private def receive(): Unit = { -// Get total number of docs in database using _all_docs endpoint -val limit = new JsonStoreDataAccess(config) - .getTotalRows(config.getTotalUrl, queryUsed = false) - -// Get continuous _changes url +// Get normal _changes url val url = config.getChangesReceiverUrl.toString val selector: String = { "{\"selector\":" + config.getSelector + "}" } -var count = 0 +// var count = 0 --- End diff -- Remove in f758996. ---
[GitHub] bahir issue #59: [BAHIR-138] fix deprecated warnings in sql-cloudant
Github user ApacheBahir commented on the issue: https://github.com/apache/bahir/pull/59 Refer to this link for build results (access rights to CI server needed): http://169.45.79.58:8080/job/bahir_spark_pr_builder/143/ ---
[GitHub] bahir issue #57: [BAHIR-128][WIP] fix failing sql-cloudant test
Github user ApacheBahir commented on the issue: https://github.com/apache/bahir/pull/57 :white_check_mark: Build successful ---
[GitHub] bahir pull request #57: [BAHIR-128][WIP] fix failing sql-cloudant test
Github user emlaver commented on a diff in the pull request: https://github.com/apache/bahir/pull/57#discussion_r157553098 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala --- @@ -67,7 +67,8 @@ class CloudantChangesConfig(protocol: String, host: String, dbName: String, } def getChangesReceiverUrl: String = { -var url = dbUrl + "/" + defaultIndex + "?include_docs=true=continuous=" + timeout +var url = dbUrl + "/" + defaultIndex + "?include_docs=true=normal" + + "_interval=1=" + timeout --- End diff -- Yes, I think that's a good idea. Fixed in f758996. ---