[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

2018-01-09 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/61#discussion_r160470451
  
--- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
@@ -16,34 +16,127 @@
  */
 package org.apache.bahir.cloudant
 
-import java.net.URLEncoder
+import java.net.{URL, URLEncoder}
 
-import play.api.libs.json.{JsArray, JsObject, Json, JsValue}
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.io.File
+
+import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database}
+import com.cloudant.client.api.model.SearchResult
+import com.cloudant.client.api.views._
+import com.cloudant.http.{Http, HttpConnection}
+import com.cloudant.http.interceptors.Replay429Interceptor
+import com.google.gson.{JsonObject, JsonParser}
 
 import org.apache.bahir.cloudant.common._
+import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
 
 /*
 * Only allow one field pushdown now
 * as the filter today does not tell how to link the filters out And v.s. Or
 */
 
 class CloudantConfig(val protocol: String, val host: String,
- val dbName: String, val indexName: String, val 
viewName: String)
+ val dbName: String, val indexPath: String, val 
viewPath: String)
 (implicit val username: String, val password: String,
  val partitions: Int, val maxInPartition: Int, val 
minInPartition: Int,
  val requestTimeout: Long, val bulkSize: Int, val 
schemaSampleSize: Int,
  val createDBOnSave: Boolean, val endpoint: String,
  val useQuery: Boolean = false, val queryLimit: Int)
   extends Serializable {
 
+  @transient private lazy val client: CloudantClient = ClientBuilder
+.url(getClientUrl)
+.username(username)
+.password(password)
+.interceptors(Replay429Interceptor.WITH_DEFAULTS)
--- End diff --

Also the defaults may be insufficient for some use cases. It may be worth 
exposing config option(s) for at least the number of retries if not the backoff.


---


[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

2018-01-05 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/61#discussion_r159845769
  
--- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
@@ -16,34 +16,127 @@
  */
 package org.apache.bahir.cloudant
 
-import java.net.URLEncoder
+import java.net.{URL, URLEncoder}
 
-import play.api.libs.json.{JsArray, JsObject, Json, JsValue}
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.io.File
+
+import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database}
+import com.cloudant.client.api.model.SearchResult
+import com.cloudant.client.api.views._
+import com.cloudant.http.{Http, HttpConnection}
+import com.cloudant.http.interceptors.Replay429Interceptor
+import com.google.gson.{JsonObject, JsonParser}
 
 import org.apache.bahir.cloudant.common._
+import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
 
 /*
 * Only allow one field pushdown now
 * as the filter today does not tell how to link the filters out And v.s. Or
 */
 
 class CloudantConfig(val protocol: String, val host: String,
- val dbName: String, val indexName: String, val 
viewName: String)
+ val dbName: String, val indexPath: String, val 
viewPath: String)
 (implicit val username: String, val password: String,
  val partitions: Int, val maxInPartition: Int, val 
minInPartition: Int,
  val requestTimeout: Long, val bulkSize: Int, val 
schemaSampleSize: Int,
  val createDBOnSave: Boolean, val endpoint: String,
  val useQuery: Boolean = false, val queryLimit: Int)
   extends Serializable {
 
+  @transient private lazy val client: CloudantClient = ClientBuilder
+.url(getClientUrl)
+.username(username)
+.password(password)
+.interceptors(Replay429Interceptor.WITH_DEFAULTS)
+.build
+  @transient private lazy val database: Database = client.database(dbName, 
false)
   lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName}
+  lazy val designDoc: String = {
+if (viewPath != null && viewPath.nonEmpty) {
+  viewPath.split("/")(1)
+} else {
+null
+}
+  }
+  lazy val searchName: String = {
+// verify that the index path matches '_design/ddoc/_search/searchname'
+if (indexPath != null && indexPath.nonEmpty && 
indexPath.matches("\\w+\\/\\w+\\/\\w+\\/\\w+")) {
+  val splitPath = indexPath.split(File.separator)
+  // return 'design-doc/search-name'
+  splitPath(1) + File.separator + splitPath(3)
+} else {
+  null
+}
+  }
+  lazy val viewName: String = {
+if (viewPath != null && viewPath.nonEmpty) {
+  val splitViewPath = viewPath.split(File.separator)
+  if(splitViewPath(3).contains("?")) {
+splitViewPath(3).substring(0, splitViewPath(3).indexOf("?"))
+  } else {
+splitViewPath(3)
+  }
+} else {
+  null
+}
+  }
 
   val pkField = "_id"
   val defaultIndex: String = endpoint
   val default_filter: String = "*:*"
 
-  def getDbUrl: String = {
-dbUrl
+  def buildAllDocsRequest(limit: Int, includeDocs: Boolean = true): 
AllDocsRequestBuilder = {
+var allDocsReq = 
database.getAllDocsRequestBuilder.includeDocs(includeDocs)
+if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
+  allDocsReq = allDocsReq.limit(limit)
+}
+allDocsReq
+  }
+
+  def buildViewRequest(limit: Int, includeDocs: Boolean = true):
+  UnpaginatedRequestBuilder[String, String] = {
+val viewReq = database.getViewRequestBuilder(designDoc, viewName)
+  .newRequest(Key.Type.STRING, classOf[String])
+  .includeDocs(includeDocs)
+if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
+  viewReq.limit(limit)
+}
+viewReq
+  }
+
+  def buildSearchRequest(limit: Int): SearchResult[JsonObject] = {
+val searchReq = database.search(searchName)
+if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
+  searchReq.limit(limit)
+}
+searchReq.querySearchResult(default_filter, classOf[JsonObject])
+  }
+
+  def executeRequest(stringUrl: String, postData: String = null): 
HttpConnection = {
+val url = new URL(stringUrl)
+if(postData != null) {
+  val conn = Http.POST(url, "application/json")
+  conn.setRequestBody(postData)

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

2018-01-05 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/61#discussion_r159889787
  
--- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala 
---
@@ -191,20 +199,30 @@ class JsonStoreRDD(sc: SparkContext, config: 
CloudantConfig)
(0 until totalPartition).map(i => {
   val skip = i * limitPerPartition
   new JsonStoreRDDPartition(url, skip, limitPerPartition, i,
-  config, selector, fields, queryUsed).asInstanceOf[Partition]
+  config, selector, fields, queryUsed)
+.asInstanceOf[Partition]
 }).toArray
   }
 
   override def compute(splitIn: Partition, context: TaskContext):
   Iterator[String] = {
 val myPartition = splitIn.asInstanceOf[JsonStoreRDDPartition]
 implicit val postData : String = {
+  val jsonObject = new JsonObject
   if (myPartition.queryUsed && myPartition.fields != null) {
-Json.stringify(Json.obj("selector" -> myPartition.selector, 
"fields" -> myPartition.fields,
-"limit" -> myPartition.limit, "skip" -> myPartition.skip))
+// Json.stringify(Json.obj("selector" -> myPartition.selector, 
"fields" ->
+// myPartition.fields, "limit" -> myPartition.limit, "skip" -> 
myPartition.skip))
--- End diff --

remove


---


[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

2018-01-05 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/61#discussion_r159845032
  
--- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
@@ -16,34 +16,127 @@
  */
 package org.apache.bahir.cloudant
 
-import java.net.URLEncoder
+import java.net.{URL, URLEncoder}
 
-import play.api.libs.json.{JsArray, JsObject, Json, JsValue}
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.io.File
+
+import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database}
+import com.cloudant.client.api.model.SearchResult
+import com.cloudant.client.api.views._
+import com.cloudant.http.{Http, HttpConnection}
+import com.cloudant.http.interceptors.Replay429Interceptor
+import com.google.gson.{JsonObject, JsonParser}
 
 import org.apache.bahir.cloudant.common._
+import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
 
 /*
 * Only allow one field pushdown now
 * as the filter today does not tell how to link the filters out And v.s. Or
 */
 
 class CloudantConfig(val protocol: String, val host: String,
- val dbName: String, val indexName: String, val 
viewName: String)
+ val dbName: String, val indexPath: String, val 
viewPath: String)
 (implicit val username: String, val password: String,
  val partitions: Int, val maxInPartition: Int, val 
minInPartition: Int,
  val requestTimeout: Long, val bulkSize: Int, val 
schemaSampleSize: Int,
  val createDBOnSave: Boolean, val endpoint: String,
  val useQuery: Boolean = false, val queryLimit: Int)
   extends Serializable {
 
+  @transient private lazy val client: CloudantClient = ClientBuilder
+.url(getClientUrl)
+.username(username)
+.password(password)
+.interceptors(Replay429Interceptor.WITH_DEFAULTS)
--- End diff --

It may be worth adding an additional interceptor here to change/augment the 
UA.


---


[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

2018-01-05 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/61#discussion_r159847773
  
--- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
@@ -54,20 +147,21 @@ class CloudantConfig(val protocol: String, val host: 
String,
 createDBOnSave
   }
 
-  def getLastNum(result: JsValue): JsValue = (result \ "last_seq").get
+  def getClientUrl: URL = {
+new URL(protocol + "://" + host)
+  }
+
+  def getLastNum(result: JsonObject): JsonObject = 
result.get("last_seq").getAsJsonObject
 
   /* Url containing limit for docs in a Cloudant database.
   * If a view is not defined, use the _all_docs endpoint.
   * @return url with one doc limit for retrieving total doc count
   */
   def getUrl(limit: Int, excludeDDoc: Boolean = false): String = {
--- End diff --

The comment suggests that this method is used to get a URL with a `limit=1` 
for the purposes of getting a doc count, but then it is not used by 
`getTotalDocCount` it does however appear to be used by `getMany` with a 
different limit for query results.
FWIW I can't see `getTotalDocCount` being called anywhere in the new code 
either.


---


[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

2018-01-05 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/61#discussion_r159845874
  
--- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
@@ -16,34 +16,127 @@
  */
 package org.apache.bahir.cloudant
 
-import java.net.URLEncoder
+import java.net.{URL, URLEncoder}
 
-import play.api.libs.json.{JsArray, JsObject, Json, JsValue}
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.io.File
+
+import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database}
+import com.cloudant.client.api.model.SearchResult
+import com.cloudant.client.api.views._
+import com.cloudant.http.{Http, HttpConnection}
+import com.cloudant.http.interceptors.Replay429Interceptor
+import com.google.gson.{JsonObject, JsonParser}
 
 import org.apache.bahir.cloudant.common._
+import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
 
 /*
 * Only allow one field pushdown now
 * as the filter today does not tell how to link the filters out And v.s. Or
 */
 
 class CloudantConfig(val protocol: String, val host: String,
- val dbName: String, val indexName: String, val 
viewName: String)
+ val dbName: String, val indexPath: String, val 
viewPath: String)
 (implicit val username: String, val password: String,
  val partitions: Int, val maxInPartition: Int, val 
minInPartition: Int,
  val requestTimeout: Long, val bulkSize: Int, val 
schemaSampleSize: Int,
  val createDBOnSave: Boolean, val endpoint: String,
  val useQuery: Boolean = false, val queryLimit: Int)
   extends Serializable {
 
+  @transient private lazy val client: CloudantClient = ClientBuilder
+.url(getClientUrl)
+.username(username)
+.password(password)
+.interceptors(Replay429Interceptor.WITH_DEFAULTS)
+.build
+  @transient private lazy val database: Database = client.database(dbName, 
false)
   lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName}
+  lazy val designDoc: String = {
+if (viewPath != null && viewPath.nonEmpty) {
+  viewPath.split("/")(1)
+} else {
+null
+}
+  }
+  lazy val searchName: String = {
+// verify that the index path matches '_design/ddoc/_search/searchname'
+if (indexPath != null && indexPath.nonEmpty && 
indexPath.matches("\\w+\\/\\w+\\/\\w+\\/\\w+")) {
+  val splitPath = indexPath.split(File.separator)
+  // return 'design-doc/search-name'
+  splitPath(1) + File.separator + splitPath(3)
+} else {
+  null
+}
+  }
+  lazy val viewName: String = {
+if (viewPath != null && viewPath.nonEmpty) {
+  val splitViewPath = viewPath.split(File.separator)
+  if(splitViewPath(3).contains("?")) {
+splitViewPath(3).substring(0, splitViewPath(3).indexOf("?"))
+  } else {
+splitViewPath(3)
+  }
+} else {
+  null
+}
+  }
 
   val pkField = "_id"
   val defaultIndex: String = endpoint
   val default_filter: String = "*:*"
 
-  def getDbUrl: String = {
-dbUrl
+  def buildAllDocsRequest(limit: Int, includeDocs: Boolean = true): 
AllDocsRequestBuilder = {
+var allDocsReq = 
database.getAllDocsRequestBuilder.includeDocs(includeDocs)
+if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
+  allDocsReq = allDocsReq.limit(limit)
+}
+allDocsReq
+  }
+
+  def buildViewRequest(limit: Int, includeDocs: Boolean = true):
+  UnpaginatedRequestBuilder[String, String] = {
+val viewReq = database.getViewRequestBuilder(designDoc, viewName)
+  .newRequest(Key.Type.STRING, classOf[String])
+  .includeDocs(includeDocs)
+if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
+  viewReq.limit(limit)
+}
+viewReq
+  }
+
+  def buildSearchRequest(limit: Int): SearchResult[JsonObject] = {
+val searchReq = database.search(searchName)
+if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
+  searchReq.limit(limit)
+}
+searchReq.querySearchResult(default_filter, classOf[JsonObject])
+  }
+
+  def executeRequest(stringUrl: String, postData: String = null): 
HttpConnection = {
+val url = new URL(stringUrl)
+if(postData != null) {
+  val conn = Http.POST(url, "application/json")
+  conn.setRequestBody(postData)

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

2018-01-05 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/61#discussion_r159890761
  
--- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala 
---
@@ -191,20 +199,30 @@ class JsonStoreRDD(sc: SparkContext, config: 
CloudantConfig)
(0 until totalPartition).map(i => {
   val skip = i * limitPerPartition
   new JsonStoreRDDPartition(url, skip, limitPerPartition, i,
-  config, selector, fields, queryUsed).asInstanceOf[Partition]
+  config, selector, fields, queryUsed)
+.asInstanceOf[Partition]
 }).toArray
   }
 
   override def compute(splitIn: Partition, context: TaskContext):
   Iterator[String] = {
 val myPartition = splitIn.asInstanceOf[JsonStoreRDDPartition]
 implicit val postData : String = {
+  val jsonObject = new JsonObject
   if (myPartition.queryUsed && myPartition.fields != null) {
-Json.stringify(Json.obj("selector" -> myPartition.selector, 
"fields" -> myPartition.fields,
-"limit" -> myPartition.limit, "skip" -> myPartition.skip))
+// Json.stringify(Json.obj("selector" -> myPartition.selector, 
"fields" ->
+// myPartition.fields, "limit" -> myPartition.limit, "skip" -> 
myPartition.skip))
+jsonObject.add("selector", myPartition.selector)
+jsonObject.add("fields", myPartition.fields)
+jsonObject.addProperty("skip", myPartition.skip)
+jsonObject.toString
   } else if (myPartition.queryUsed) {
-Json.stringify(Json.obj("selector" -> myPartition.selector, 
"limit" -> myPartition.limit,
-"skip" -> myPartition.skip))
+// Json.stringify(Json.obj("selector" -> myPartition.selector,
+// "limit" -> myPartition.limit, "skip" -> myPartition.skip))
+jsonObject.add("selector", myPartition.selector)
--- End diff --

Could add the `selector` and `skip` before the `if`/`else` and avoid the 
duplication


---


[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

2018-01-05 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/61#discussion_r159889840
  
--- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala 
---
@@ -191,20 +199,30 @@ class JsonStoreRDD(sc: SparkContext, config: 
CloudantConfig)
(0 until totalPartition).map(i => {
   val skip = i * limitPerPartition
   new JsonStoreRDDPartition(url, skip, limitPerPartition, i,
-  config, selector, fields, queryUsed).asInstanceOf[Partition]
+  config, selector, fields, queryUsed)
+.asInstanceOf[Partition]
 }).toArray
   }
 
   override def compute(splitIn: Partition, context: TaskContext):
   Iterator[String] = {
 val myPartition = splitIn.asInstanceOf[JsonStoreRDDPartition]
 implicit val postData : String = {
+  val jsonObject = new JsonObject
   if (myPartition.queryUsed && myPartition.fields != null) {
-Json.stringify(Json.obj("selector" -> myPartition.selector, 
"fields" -> myPartition.fields,
-"limit" -> myPartition.limit, "skip" -> myPartition.skip))
+// Json.stringify(Json.obj("selector" -> myPartition.selector, 
"fields" ->
+// myPartition.fields, "limit" -> myPartition.limit, "skip" -> 
myPartition.skip))
+jsonObject.add("selector", myPartition.selector)
+jsonObject.add("fields", myPartition.fields)
+jsonObject.addProperty("skip", myPartition.skip)
+jsonObject.toString
   } else if (myPartition.queryUsed) {
-Json.stringify(Json.obj("selector" -> myPartition.selector, 
"limit" -> myPartition.limit,
-"skip" -> myPartition.skip))
+// Json.stringify(Json.obj("selector" -> myPartition.selector,
+// "limit" -> myPartition.limit, "skip" -> myPartition.skip))
--- End diff --

remove


---


[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

2018-01-05 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/61#discussion_r159845855
  
--- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
@@ -16,34 +16,127 @@
  */
 package org.apache.bahir.cloudant
 
-import java.net.URLEncoder
+import java.net.{URL, URLEncoder}
 
-import play.api.libs.json.{JsArray, JsObject, Json, JsValue}
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.io.File
+
+import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database}
+import com.cloudant.client.api.model.SearchResult
+import com.cloudant.client.api.views._
+import com.cloudant.http.{Http, HttpConnection}
+import com.cloudant.http.interceptors.Replay429Interceptor
+import com.google.gson.{JsonObject, JsonParser}
 
 import org.apache.bahir.cloudant.common._
+import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
 
 /*
 * Only allow one field pushdown now
 * as the filter today does not tell how to link the filters out And v.s. Or
 */
 
 class CloudantConfig(val protocol: String, val host: String,
- val dbName: String, val indexName: String, val 
viewName: String)
+ val dbName: String, val indexPath: String, val 
viewPath: String)
 (implicit val username: String, val password: String,
  val partitions: Int, val maxInPartition: Int, val 
minInPartition: Int,
  val requestTimeout: Long, val bulkSize: Int, val 
schemaSampleSize: Int,
  val createDBOnSave: Boolean, val endpoint: String,
  val useQuery: Boolean = false, val queryLimit: Int)
   extends Serializable {
 
+  @transient private lazy val client: CloudantClient = ClientBuilder
+.url(getClientUrl)
+.username(username)
+.password(password)
+.interceptors(Replay429Interceptor.WITH_DEFAULTS)
+.build
+  @transient private lazy val database: Database = client.database(dbName, 
false)
   lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName}
+  lazy val designDoc: String = {
+if (viewPath != null && viewPath.nonEmpty) {
+  viewPath.split("/")(1)
+} else {
+null
+}
+  }
+  lazy val searchName: String = {
+// verify that the index path matches '_design/ddoc/_search/searchname'
+if (indexPath != null && indexPath.nonEmpty && 
indexPath.matches("\\w+\\/\\w+\\/\\w+\\/\\w+")) {
+  val splitPath = indexPath.split(File.separator)
+  // return 'design-doc/search-name'
+  splitPath(1) + File.separator + splitPath(3)
+} else {
+  null
+}
+  }
+  lazy val viewName: String = {
+if (viewPath != null && viewPath.nonEmpty) {
+  val splitViewPath = viewPath.split(File.separator)
+  if(splitViewPath(3).contains("?")) {
+splitViewPath(3).substring(0, splitViewPath(3).indexOf("?"))
+  } else {
+splitViewPath(3)
+  }
+} else {
+  null
+}
+  }
 
   val pkField = "_id"
   val defaultIndex: String = endpoint
   val default_filter: String = "*:*"
 
-  def getDbUrl: String = {
-dbUrl
+  def buildAllDocsRequest(limit: Int, includeDocs: Boolean = true): 
AllDocsRequestBuilder = {
+var allDocsReq = 
database.getAllDocsRequestBuilder.includeDocs(includeDocs)
+if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
+  allDocsReq = allDocsReq.limit(limit)
+}
+allDocsReq
+  }
+
+  def buildViewRequest(limit: Int, includeDocs: Boolean = true):
+  UnpaginatedRequestBuilder[String, String] = {
+val viewReq = database.getViewRequestBuilder(designDoc, viewName)
+  .newRequest(Key.Type.STRING, classOf[String])
+  .includeDocs(includeDocs)
+if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
+  viewReq.limit(limit)
+}
+viewReq
+  }
+
+  def buildSearchRequest(limit: Int): SearchResult[JsonObject] = {
+val searchReq = database.search(searchName)
+if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
+  searchReq.limit(limit)
+}
+searchReq.querySearchResult(default_filter, classOf[JsonObject])
+  }
+
+  def executeRequest(stringUrl: String, postData: String = null): 
HttpConnection = {
+val url = new URL(stringUrl)
+if(postData != null) {
+  val conn = Http.POST(url, "application/json")
+  conn.setRequestBody(postData)

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

2018-01-05 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/61#discussion_r159889644
  
--- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala 
---
@@ -174,7 +178,11 @@ class JsonStoreRDD(sc: SparkContext, config: 
CloudantConfig)
 
 implicit val postData : String = {
   if (queryUsed) {
-Json.stringify(Json.obj("selector" -> selector, "limit" -> 1))
+val jsonSelector = new JsonObject
+jsonSelector.addProperty("selector", selector.toString)
+jsonSelector.addProperty("limit", 1)
+jsonSelector.toString
+// Json.stringify(Json.obj("selector" -> selector, "limit" -> 1))
--- End diff --

remove


---


[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

2018-01-05 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/61#discussion_r159889542
  
--- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala 
---
@@ -101,24 +105,24 @@ class JsonStoreRDD(sc: SparkContext, config: 
CloudantConfig)
   case LessThanOrEqual(attr, v) => ("$lte", v)
   case _ => (null, null)
 }
-val convertedV: JsValue = {
+val convertedV: JsonElement = {
   // TODO Better handing of other types
   if (value != null) {
 value match {
-  case s: String => Json.toJson(s)
-  case l: Long => Json.toJson(l)
-  case d: Double => Json.toJson(d)
-  case i: Int => Json.toJson(i)
-  case b: Boolean => Json.toJson(b)
-  case t: java.sql.Timestamp => Json.toJson(t)
+  case s: String => parser.parse(s)// Json.toJson(s)
--- End diff --

some old code here that should probably be removed (and on the following 
line)


---


[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

2018-01-05 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/61#discussion_r159845718
  
--- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
@@ -16,34 +16,127 @@
  */
 package org.apache.bahir.cloudant
 
-import java.net.URLEncoder
+import java.net.{URL, URLEncoder}
 
-import play.api.libs.json.{JsArray, JsObject, Json, JsValue}
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.io.File
+
+import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database}
+import com.cloudant.client.api.model.SearchResult
+import com.cloudant.client.api.views._
+import com.cloudant.http.{Http, HttpConnection}
+import com.cloudant.http.interceptors.Replay429Interceptor
+import com.google.gson.{JsonObject, JsonParser}
 
 import org.apache.bahir.cloudant.common._
+import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
 
 /*
 * Only allow one field pushdown now
 * as the filter today does not tell how to link the filters out And v.s. Or
 */
 
 class CloudantConfig(val protocol: String, val host: String,
- val dbName: String, val indexName: String, val 
viewName: String)
+ val dbName: String, val indexPath: String, val 
viewPath: String)
 (implicit val username: String, val password: String,
  val partitions: Int, val maxInPartition: Int, val 
minInPartition: Int,
  val requestTimeout: Long, val bulkSize: Int, val 
schemaSampleSize: Int,
  val createDBOnSave: Boolean, val endpoint: String,
  val useQuery: Boolean = false, val queryLimit: Int)
   extends Serializable {
 
+  @transient private lazy val client: CloudantClient = ClientBuilder
+.url(getClientUrl)
+.username(username)
+.password(password)
+.interceptors(Replay429Interceptor.WITH_DEFAULTS)
+.build
+  @transient private lazy val database: Database = client.database(dbName, 
false)
   lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName}
+  lazy val designDoc: String = {
+if (viewPath != null && viewPath.nonEmpty) {
+  viewPath.split("/")(1)
+} else {
+null
+}
+  }
+  lazy val searchName: String = {
+// verify that the index path matches '_design/ddoc/_search/searchname'
+if (indexPath != null && indexPath.nonEmpty && 
indexPath.matches("\\w+\\/\\w+\\/\\w+\\/\\w+")) {
+  val splitPath = indexPath.split(File.separator)
+  // return 'design-doc/search-name'
+  splitPath(1) + File.separator + splitPath(3)
+} else {
+  null
+}
+  }
+  lazy val viewName: String = {
+if (viewPath != null && viewPath.nonEmpty) {
+  val splitViewPath = viewPath.split(File.separator)
+  if(splitViewPath(3).contains("?")) {
+splitViewPath(3).substring(0, splitViewPath(3).indexOf("?"))
+  } else {
+splitViewPath(3)
+  }
+} else {
+  null
+}
+  }
 
   val pkField = "_id"
   val defaultIndex: String = endpoint
   val default_filter: String = "*:*"
 
-  def getDbUrl: String = {
-dbUrl
+  def buildAllDocsRequest(limit: Int, includeDocs: Boolean = true): 
AllDocsRequestBuilder = {
+var allDocsReq = 
database.getAllDocsRequestBuilder.includeDocs(includeDocs)
+if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
+  allDocsReq = allDocsReq.limit(limit)
+}
+allDocsReq
+  }
+
+  def buildViewRequest(limit: Int, includeDocs: Boolean = true):
+  UnpaginatedRequestBuilder[String, String] = {
+val viewReq = database.getViewRequestBuilder(designDoc, viewName)
+  .newRequest(Key.Type.STRING, classOf[String])
+  .includeDocs(includeDocs)
+if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
+  viewReq.limit(limit)
+}
+viewReq
+  }
+
+  def buildSearchRequest(limit: Int): SearchResult[JsonObject] = {
+val searchReq = database.search(searchName)
+if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
+  searchReq.limit(limit)
+}
+searchReq.querySearchResult(default_filter, classOf[JsonObject])
+  }
+
+  def executeRequest(stringUrl: String, postData: String = null): 
HttpConnection = {
+val url = new URL(stringUrl)
+if(postData != null) {
+  val conn = Http.POST(url, "application/json")
+  conn.setRequestBody(postData)
 

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

2018-01-05 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/61#discussion_r159848274
  
--- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
@@ -95,15 +189,49 @@ class CloudantConfig(val protocol: String, val host: 
String,
 }
   }
 
+  def getTotalDocCount: Int = {
+val limit = 1
+if (viewPath != null) {
+  // "limit=" + limit + "=" + skip
+  buildViewRequest(limit, includeDocs = 
false).build().getResponse.getTotalRowCount.toInt
+} else {
+  // /_all_docs?limit=1
+  // Note: java-cloudant's AllDocsRequest doesn't have a 
getTotalRowCount method
+  // buildAllDocsRequest(1, includeDocs = 
false).build().getResponse.getTotalRowCount.toInt
+  val response = client.executeRequest(Http.GET(
+new URL(database.getDBUri + File.separator + endpoint + "?limit=" 
+ limit)))
+  getResultTotalRows(response.responseAsString)
+}
+  }
+
+  def getDocs(limit: Int): List[JsonObject] = {
+if (viewPath != null) {
+  // "limit=" + limit + "=" + skip
--- End diff --

remove?


---


[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

2018-01-04 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/61#discussion_r159709935
  
--- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
@@ -95,15 +189,49 @@ class CloudantConfig(val protocol: String, val host: 
String,
 }
   }
 
+  def getTotalDocCount: Int = {
+val limit = 1
+if (viewPath != null) {
+  // "limit=" + limit + "=" + skip
+  buildViewRequest(limit, includeDocs = 
false).build().getResponse.getTotalRowCount.toInt
+} else {
+  // /_all_docs?limit=1
+  // Note: java-cloudant's AllDocsRequest doesn't have a 
getTotalRowCount method
+  // buildAllDocsRequest(1, includeDocs = 
false).build().getResponse.getTotalRowCount.toInt
+  val response = client.executeRequest(Http.GET(
+new URL(database.getDBUri + File.separator + endpoint + "?limit=" 
+ limit)))
+  getResultTotalRows(response.responseAsString)
+}
+  }
+
+  def getDocs(limit: Int): List[JsonObject] = {
+if (viewPath != null) {
+  // "limit=" + limit + "=" + skip
+  
buildViewRequest(limit).build().getResponse.getDocsAs(classOf[JsonObject]).asScala.toList
+} else if (indexPath != null) {
+  var searchDocs = mutable.ListBuffer[JsonObject]()
+  for (result: SearchResult[JsonObject]#SearchResultRow <-
+   buildSearchRequest(limit).getRows.asScala) {
+searchDocs += result.getDoc
+  }
+  searchDocs.toList
+} else {
+  // /_all_docs?limit=1
+  // val response = client.executeRequest(Http.GET(
--- End diff --

Remove commented out code?


---


[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

2018-01-04 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/61#discussion_r159699798
  
--- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
@@ -16,34 +16,127 @@
  */
 package org.apache.bahir.cloudant
 
-import java.net.URLEncoder
+import java.net.{URL, URLEncoder}
 
-import play.api.libs.json.{JsArray, JsObject, Json, JsValue}
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.io.File
+
+import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database}
+import com.cloudant.client.api.model.SearchResult
+import com.cloudant.client.api.views._
+import com.cloudant.http.{Http, HttpConnection}
+import com.cloudant.http.interceptors.Replay429Interceptor
+import com.google.gson.{JsonObject, JsonParser}
 
 import org.apache.bahir.cloudant.common._
+import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
 
 /*
 * Only allow one field pushdown now
 * as the filter today does not tell how to link the filters out And v.s. Or
 */
 
 class CloudantConfig(val protocol: String, val host: String,
- val dbName: String, val indexName: String, val 
viewName: String)
+ val dbName: String, val indexPath: String, val 
viewPath: String)
 (implicit val username: String, val password: String,
  val partitions: Int, val maxInPartition: Int, val 
minInPartition: Int,
  val requestTimeout: Long, val bulkSize: Int, val 
schemaSampleSize: Int,
  val createDBOnSave: Boolean, val endpoint: String,
  val useQuery: Boolean = false, val queryLimit: Int)
   extends Serializable {
 
+  @transient private lazy val client: CloudantClient = ClientBuilder
+.url(getClientUrl)
+.username(username)
+.password(password)
+.interceptors(Replay429Interceptor.WITH_DEFAULTS)
+.build
+  @transient private lazy val database: Database = client.database(dbName, 
false)
   lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName}
+  lazy val designDoc: String = {
+if (viewPath != null && viewPath.nonEmpty) {
+  viewPath.split("/")(1)
+} else {
+null
+}
+  }
+  lazy val searchName: String = {
+// verify that the index path matches '_design/ddoc/_search/searchname'
+if (indexPath != null && indexPath.nonEmpty && 
indexPath.matches("\\w+\\/\\w+\\/\\w+\\/\\w+")) {
+  val splitPath = indexPath.split(File.separator)
--- End diff --

If you separate out the regex pattern from earlier you could use the 
captured groups to extract the design doc ID and search index name without 
needing to do more splits here.


---


[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

2018-01-04 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/61#discussion_r159709382
  
--- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
@@ -95,15 +189,49 @@ class CloudantConfig(val protocol: String, val host: 
String,
 }
   }
 
+  def getTotalDocCount: Int = {
+val limit = 1
+if (viewPath != null) {
+  // "limit=" + limit + "=" + skip
+  buildViewRequest(limit, includeDocs = 
false).build().getResponse.getTotalRowCount.toInt
+} else {
+  // /_all_docs?limit=1
+  // Note: java-cloudant's AllDocsRequest doesn't have a 
getTotalRowCount method
+  // buildAllDocsRequest(1, includeDocs = 
false).build().getResponse.getTotalRowCount.toInt
+  val response = client.executeRequest(Http.GET(
+new URL(database.getDBUri + File.separator + endpoint + "?limit=" 
+ limit)))
+  getResultTotalRows(response.responseAsString)
--- End diff --

It might be easier to use
`com.cloudant.client.api.model.DbInfo#getDocCount` instead of trying to do 
it via `_all_docs`


---


[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

2018-01-04 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/61#discussion_r159699042
  
--- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
@@ -16,34 +16,127 @@
  */
 package org.apache.bahir.cloudant
 
-import java.net.URLEncoder
+import java.net.{URL, URLEncoder}
 
-import play.api.libs.json.{JsArray, JsObject, Json, JsValue}
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.io.File
+
+import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database}
+import com.cloudant.client.api.model.SearchResult
+import com.cloudant.client.api.views._
+import com.cloudant.http.{Http, HttpConnection}
+import com.cloudant.http.interceptors.Replay429Interceptor
+import com.google.gson.{JsonObject, JsonParser}
 
 import org.apache.bahir.cloudant.common._
+import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
 
 /*
 * Only allow one field pushdown now
 * as the filter today does not tell how to link the filters out And v.s. Or
 */
 
 class CloudantConfig(val protocol: String, val host: String,
- val dbName: String, val indexName: String, val 
viewName: String)
+ val dbName: String, val indexPath: String, val 
viewPath: String)
 (implicit val username: String, val password: String,
  val partitions: Int, val maxInPartition: Int, val 
minInPartition: Int,
  val requestTimeout: Long, val bulkSize: Int, val 
schemaSampleSize: Int,
  val createDBOnSave: Boolean, val endpoint: String,
  val useQuery: Boolean = false, val queryLimit: Int)
   extends Serializable {
 
+  @transient private lazy val client: CloudantClient = ClientBuilder
+.url(getClientUrl)
+.username(username)
+.password(password)
+.interceptors(Replay429Interceptor.WITH_DEFAULTS)
+.build
+  @transient private lazy val database: Database = client.database(dbName, 
false)
   lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName}
+  lazy val designDoc: String = {
+if (viewPath != null && viewPath.nonEmpty) {
+  viewPath.split("/")(1)
+} else {
+null
+}
+  }
+  lazy val searchName: String = {
+// verify that the index path matches '_design/ddoc/_search/searchname'
+if (indexPath != null && indexPath.nonEmpty && 
indexPath.matches("\\w+\\/\\w+\\/\\w+\\/\\w+")) {
--- End diff --

I think you could leverage scala's raw interpreter here to make the regex a 
bit less escape-y:
`raw"\w+\/\w+\/\w+\/\w+"`

Also are word characters sufficient here? Aren't there some other 
characters that could be part of a design document id and allowed unencoded in 
a URL path (e.g. maybe `@`?) that would be excluded by this regex? I'd be 
inclined to maybe use something like: `_design\/(^\/)+\/_search\/(^\/)+`


---


[GitHub] bahir pull request #57: [BAHIR-128][WIP] fix failing sql-cloudant test

2017-12-18 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/57#discussion_r157480164
  
--- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala ---
@@ -125,7 +125,7 @@ class DefaultSource extends RelationProvider
   /* Create a streaming context to handle transforming docs in
   * larger databases into Spark datasets
   */
-  val ssc = new StreamingContext(sqlContext.sparkContext, 
Seconds(10))
+  val ssc = new StreamingContext(sqlContext.sparkContext, 
Seconds(8))
--- End diff --

I'm amazed this parameter was hard-coded as it seems to be a fairly 
critical tuning parameter for streaming. I guess this is one for another PR.


---


[GitHub] bahir pull request #57: [BAHIR-128][WIP] fix failing sql-cloudant test

2017-12-18 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/57#discussion_r157484722
  
--- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
 ---
@@ -39,56 +37,38 @@ class ChangesReceiver(config: CloudantChangesConfig)
   }
 
   private def receive(): Unit = {
-// Get total number of docs in database using _all_docs endpoint
-val limit = new JsonStoreDataAccess(config)
-  .getTotalRows(config.getTotalUrl, queryUsed = false)
-
-// Get continuous _changes url
+// Get normal _changes url
--- End diff --

I'm a bit confused about this change. Since Spark Streaming is the basis 
for "real-time" or "continuous applications" doesn't this need to keep 
listening to the changes feed to wait for more changes?


---


[GitHub] bahir pull request #57: [BAHIR-128][WIP] fix failing sql-cloudant test

2017-12-18 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/57#discussion_r157478713
  
--- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala
 ---
@@ -67,7 +67,8 @@ class CloudantChangesConfig(protocol: String, host: 
String, dbName: String,
   }
 
   def getChangesReceiverUrl: String = {
-var url = dbUrl + "/" + defaultIndex + 
"?include_docs=true=continuous=" + timeout
+var url = dbUrl + "/" + defaultIndex + 
"?include_docs=true=normal" +
+  "_interval=1=" + timeout
--- End diff --

WDYT about making the `seq_interval` the `bulkSize` instead of hard-coding?


---


[GitHub] bahir pull request #57: [BAHIR-128][WIP] fix failing sql-cloudant test

2017-12-18 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/57#discussion_r157479639
  
--- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
 ---
@@ -39,56 +37,38 @@ class ChangesReceiver(config: CloudantChangesConfig)
   }
 
   private def receive(): Unit = {
-// Get total number of docs in database using _all_docs endpoint
-val limit = new JsonStoreDataAccess(config)
-  .getTotalRows(config.getTotalUrl, queryUsed = false)
-
-// Get continuous _changes url
+// Get normal _changes url
 val url = config.getChangesReceiverUrl.toString
 val selector: String = {
   "{\"selector\":" + config.getSelector + "}"
 }
 
-var count = 0
+// var count = 0
--- End diff --

delete?


---


[GitHub] bahir-website pull request #13: Fixed typos in Apache CouchDB

2017-11-17 Thread ricellis
GitHub user ricellis opened a pull request:

https://github.com/apache/bahir-website/pull/13

Fixed typos in Apache CouchDB

Apache CouchDB is incorrectly referred to as `CounchDB` twice on the 
website.
This PR corrects those typos.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ricellis/bahir-website couchdb-typos

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/bahir-website/pull/13.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #13


commit b18d0ed860e53df588f09237deed30805b9d1e49
Author: Rich Ellis <ricel...@users.noreply.github.com>
Date:   2017-11-17T09:25:50Z

Fixed typos in Apache CouchDB




---


[GitHub] bahir pull request #45: [WIP] [BAHIR-110] Implement _changes API for non-str...

2017-07-10 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/45#discussion_r126401728
  
--- Diff: sql-cloudant/README.md ---
@@ -31,15 +31,14 @@ The `--packages` argument can also be used with 
`bin/spark-submit`.
 
 Submit a job in Python:
 
-spark-submit  --master local[4] --jars   
 
+spark-submit  --master local[4] --packages 
org.apache.bahir:spark-sql-cloudant_2.11:2.2.0-SNAPSHOT   
 
 Submit a job in Scala:
 
-   spark-submit --class "" --master local[4] --jars  
+   spark-submit --class "" --master local[4] --packages 
org.apache.bahir:spark-sql-cloudant_2.11:2.2.0-SNAPSHOT 
--- End diff --

`SNAPSHOT` again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] bahir pull request #45: [WIP] [BAHIR-110] Implement _changes API for non-str...

2017-07-10 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/45#discussion_r126402346
  
--- Diff: sql-cloudant/README.md ---
@@ -52,39 +51,71 @@ Here each subsequent configuration overrides the 
previous one. Thus, configurati
 
 
 ### Configuration in application.conf
-Default values are defined in 
[here](cloudant-spark-sql/src/main/resources/application.conf).
+Default values are defined in [here](src/main/resources/application.conf).
 
 ### Configuration on SparkConf
 
 Name | Default | Meaning
 --- |:---:| ---
+cloudant.apiReceiver|"_all_docs"| API endpoint for RelationProvider when 
loading or saving data from Cloudant to DataFrames or SQL temporary tables. 
Select between "_all_docs" or "_changes" endpoint.
--- End diff --

I would say
>Cloudant API endpoint to use for

or
>Select between the Cloudant `_all_docs` or `_changes` endpoint

Probably also worth a "see below for details|notes|further explanation" or 
similar to help people find the content that would help them choose the 
appropriate endpoint.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] bahir pull request #45: [WIP] [BAHIR-110] Implement _changes API for non-str...

2017-07-10 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/45#discussion_r126438628
  
--- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala ---
@@ -98,29 +99,89 @@ class DefaultSource extends RelationProvider
 
   val config: CloudantConfig = 
JsonStoreConfigManager.getConfig(sqlContext, parameters)
 
-  var allDocsDF: DataFrame = null
+  var dataFrame: DataFrame = null
 
   val schema: StructType = {
 if (inSchema != null) {
   inSchema
-} else {
-  val df = if (config.getSchemaSampleSize() ==
-JsonStoreConfigManager.ALL_DOCS_LIMIT &&
+} else if (!config.isInstanceOf[CloudantChangesConfig]
+  || config.viewName != null || config.indexName != null) {
+  val df = if (config.getSchemaSampleSize ==
+JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT &&
 config.viewName == null
 && config.indexName == null) {
 val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, 
config)
-allDocsDF = sqlContext.read.json(cloudantRDD)
-allDocsDF
+dataFrame = sqlContext.read.json(cloudantRDD)
+dataFrame
   } else {
 val dataAccess = new JsonStoreDataAccess(config)
 val aRDD = sqlContext.sparkContext.parallelize(
-dataAccess.getMany(config.getSchemaSampleSize()))
+dataAccess.getMany(config.getSchemaSampleSize))
 sqlContext.read.json(aRDD)
   }
   df.schema
+} else {
+  /* Create a streaming context to handle transforming docs in
+  * larger databases into Spark datasets
+  */
+  /* Allow the raw data and persisted RDDs to be accessible outside
+  * of the streaming context.
+  * See https://spark.apache.org/docs/latest/configuration.html
+  * for more details.
+  */
+  sqlContext.sparkSession.conf.set("spark.streaming.unpersist", 
"false")
+
+  val ssc = new StreamingContext(sqlContext.sparkContext, 
Seconds(10))
+  val streamingMap = {
+val selector = 
config.asInstanceOf[CloudantChangesConfig].getSelector
+if (selector != null) {
+  Map(
+"database" -> config.getDbname,
+"selector" -> selector
+  )
+} else {
+  Map(
+"database" -> config.getDbname
+  )
+}
+  }
+
+  val changes = ssc.receiverStream(
+new CloudantReceiver(sqlContext.sparkContext.getConf, 
streamingMap))
+  changes.persist(config.asInstanceOf[CloudantChangesConfig]
+.getStorageLevelForStreaming)
+
+  // Global RDD that's created from union of all RDDs
+  var globalRDD = ssc.sparkContext.emptyRDD[String]
+
+  logger.info("Loading data from Cloudant using "
++ 
config.asInstanceOf[CloudantChangesConfig].getContinuousChangesUrl)
+
+  // Collect and union each RDD to convert all RDDs to a DataFrame
+  changes.foreachRDD((rdd: RDD[String]) => {
+if (!rdd.isEmpty()) {
+  if (globalRDD != null) {
+// Union RDDs in foreach loop
+globalRDD = globalRDD.union(rdd)
+  } else {
+globalRDD = rdd
+  }
+} else {
+  // Convert final global RDD[String] to DataFrame
+  dataFrame = sqlContext.sparkSession.read.json(globalRDD)
+  ssc.stop(stopSparkContext = false, stopGracefully = false)
+}
+  })
+
+  ssc.start
+  // run streaming until all docs from continuous feed are received
+  ssc.awaitTermination
+  // ssc.stop(stopSparkContext = false, stopGracefully = false)
--- End diff --

Commented out code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] bahir pull request #45: [WIP] [BAHIR-110] Implement _changes API for non-str...

2017-07-10 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/45#discussion_r126435531
  
--- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala ---
@@ -16,23 +16,20 @@
  */
 package org.apache.bahir.cloudant
 
-// scalastyle:off
-import scalaj.http._
-
 import play.api.libs.json.Json
+import scalaj.http._
 
+import org.apache.spark.SparkConf
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.SparkConf
 
 import org.apache.bahir.cloudant.common._
-// scalastyle:on
 
 class CloudantReceiver(sparkConf: SparkConf, cloudantParams: Map[String, 
String])
 extends Receiver[String](StorageLevel.MEMORY_AND_DISK) {
-  lazy val config: CloudantConfig = {
+  lazy val config: CloudantChangesConfig = {
--- End diff --

Would this class be better named `CloudantChangesReceiver`? Since it 
doesn't appear to be a general case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] bahir pull request #45: [WIP] [BAHIR-110] Implement _changes API for non-str...

2017-07-10 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/45#discussion_r126409663
  
--- Diff: sql-cloudant/README.md ---
@@ -52,39 +51,71 @@ Here each subsequent configuration overrides the 
previous one. Thus, configurati
 
 
 ### Configuration in application.conf
-Default values are defined in 
[here](cloudant-spark-sql/src/main/resources/application.conf).
+Default values are defined in [here](src/main/resources/application.conf).
 
 ### Configuration on SparkConf
 
 Name | Default | Meaning
 --- |:---:| ---
+cloudant.apiReceiver|"_all_docs"| API endpoint for RelationProvider when 
loading or saving data from Cloudant to DataFrames or SQL temporary tables. 
Select between "_all_docs" or "_changes" endpoint.
 cloudant.protocol|https|protocol to use to transfer data: http or https
-cloudant.host||cloudant host url
-cloudant.username||cloudant userid
-cloudant.password||cloudant password
+cloudant.host| |cloudant host url
+cloudant.username| |cloudant userid
+cloudant.password| |cloudant password
 cloudant.useQuery|false|By default, _all_docs endpoint is used if 
configuration 'view' and 'index' (see below) are not set. When useQuery is 
enabled, _find endpoint will be used in place of _all_docs when query condition 
is not on primary key field (_id), so that query predicates may be driven into 
datastore. 
 cloudant.queryLimit|25|The maximum number of results returned when 
querying the _find endpoint.
 jsonstore.rdd.partitions|10|the number of partitions intent used to drive 
JsonStoreRDD loading query result in parallel. The actual number is calculated 
based on total rows returned and satisfying maxInPartition and minInPartition
 jsonstore.rdd.maxInPartition|-1|the max rows in a partition. -1 means 
unlimited
 jsonstore.rdd.minInPartition|10|the min rows in a partition.
 jsonstore.rdd.requestTimeout|90| the request timeout in milliseconds
 bulkSize|200| the bulk save size
-schemaSampleSize| "-1" | the sample size for RDD schema discovery. 1 means 
we are using only first document for schema discovery; -1 means all documents; 
0 will be treated as 1; any number N means min(N, total) docs 
-createDBOnSave|"false"| whether to create a new database during save 
operation. If false, a database should already exist. If true, a new database 
will be created. If true, and a database with a provided name already exists, 
an error will be raised. 
+schemaSampleSize|-1| the sample size for RDD schema discovery. 1 means we 
are using only first document for schema discovery; -1 means all documents; 0 
will be treated as 1; any number N means min(N, total) docs 
+createDBOnSave|false| whether to create a new database during save 
operation. If false, a database should already exist. If true, a new database 
will be created. If true, and a database with a provided name already exists, 
an error will be raised. 
+
+The `cloudant.apiReceiver` option allows for _changes or _all_docs API 
endpoint to be called while loading Cloudant data into Spark DataFrames or SQL 
Tables,
+or saving data from DataFrames or SQL Tables to a Cloudant database.
+
+**Note:** When using `_changes` API, please consider: 
+1. Results are partially ordered and may not be be presented in order in 
+which documents were updated.
+2. In case of shards' unavailability, you may see duplicate results 
(changes that have been seen already)
+3. Can use `selector` option to filter Cloudant docs during load
+4. Supports a real snapshot of the database and represents it in a single 
point of time.
+5. Only supports single threaded
+
+
+When using `_all_docs` API:
+1. Supports parallel reads (using offset and range)
+2. Using partitions may not represent the true snapshot of a database.  
Some docs
+   may be added or deleted in the database between loading data into 
different 
+   Spark partitions.
+
+Performance of `_changes` API is still better in most cases (even with 
single threaded support). 
+During several performance tests using 200 MB to 15 GB Cloudant databases, 
load time from Cloudant to Spark using 
+`_changes` feed was faster to complete every time compared to `_all_docs`.
+ 
+See 
[CloudantChangesDFSuite](src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala)
 
+for examples of loading data into a Spark DataFrame with `_changes` API.
 
 ### Configuration on Spark SQL Temporary Table or DataFrame
 
 Besides all the configurations passed to a temporary table or dataframe 
through SparkConf, it is also possible to set the following configurations in 
temporary table or dataframe using OPTIONS: 
 
 Name | Default | Meaning
 --- |:---:| ---
-database||cloudant database n

[GitHub] bahir pull request #45: [WIP] [BAHIR-110] Implement _changes API for non-str...

2017-07-10 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/45#discussion_r126428690
  
--- Diff: 
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
@@ -30,81 +28,83 @@ import org.apache.bahir.cloudant.common._
 */
 
 class CloudantConfig(val protocol: String, val host: String,
-val dbName: String, val indexName: String = null, val viewName: String 
= null)
+val dbName: String, val indexName: String, val viewName: String)
 (implicit val username: String, val password: String,
 val partitions: Int, val maxInPartition: Int, val minInPartition: Int,
 val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int,
-val createDBOnSave: Boolean, val selector: String, val useQuery: 
Boolean = false,
-val queryLimit: Int)
-extends Serializable{
+val createDBOnSave: Boolean, val apiReceiver: String,
+val useQuery: Boolean = false, val queryLimit: Int)
+extends Serializable {
 
-  private lazy val dbUrl = {protocol + "://" + host + "/" + dbName}
+  lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName}
 
   val pkField = "_id"
-  val defaultIndex = "_all_docs" // "_changes" does not work for partition
+  val defaultIndex: String = apiReceiver
   val default_filter: String = "*:*"
 
-  def getContinuousChangesUrl(): String = {
-var url = dbUrl + 
"/_changes?include_docs=true=continuous=3000"
-if (selector != null) {
-  url = url + "=_selector"
-}
-url
-  }
-
-  def getSelector() : String = {
-selector
-  }
-
-  def getDbUrl(): String = {
+  def getDbUrl: String = {
 dbUrl
   }
 
-  def getSchemaSampleSize(): Int = {
+  def getSchemaSampleSize: Int = {
 schemaSampleSize
   }
 
-  def getCreateDBonSave(): Boolean = {
+  def getCreateDBonSave: Boolean = {
 createDBOnSave
   }
 
-  def getTotalUrl(url: String): String = {
-if (url.contains('?')) {
-  url + "=1"
-} else {
-  url + "?limit=1"
-}
-  }
-
-  def getDbname(): String = {
-dbName
-  }
-
-  def queryEnabled(): Boolean = {useQuery && indexName==null && 
viewName==null}
-
-  def allowPartition(queryUsed: Boolean): Boolean = {indexName==null && 
!queryUsed}
-
-  def getAllDocsUrl(limit: Int, excludeDDoc: Boolean = false): String = {
+  def getLastNum(result: JsValue): JsValue = (result \ "last_seq").get
 
+  /* Url containing limit for docs in a Cloudant database.
+  * If a view is not defined, use the _all_docs endpoint.
+  * @return url with one doc limit for retrieving total doc count
+  */
+  def getUrl(limit: Int, excludeDDoc: Boolean = false): String = {
 if (viewName == null) {
-  val baseUrl = (
-  if ( excludeDDoc) dbUrl + 
"/_all_docs?startkey=%22_design0/%22_docs=true"
-  else dbUrl + "/_all_docs?include_docs=true"
-  )
-  if (limit == JsonStoreConfigManager.ALL_DOCS_LIMIT) {
+  val baseUrl = {
+if (excludeDDoc) {
+  dbUrl + "/_all_docs?startkey=%22_design0/%22_docs=true"
--- End diff --

See 
https://github.com/cloudant/java-cloudant/issues/344#issuecomment-276938689
This `startkey` makes the assumption that no document IDs will start with 
upper case letters. Possibly one for another issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] bahir pull request #45: [WIP] [BAHIR-110] Implement _changes API for non-str...

2017-07-10 Thread ricellis
Github user ricellis commented on a diff in the pull request:

https://github.com/apache/bahir/pull/45#discussion_r126402723
  
--- Diff: sql-cloudant/README.md ---
@@ -52,39 +51,71 @@ Here each subsequent configuration overrides the 
previous one. Thus, configurati
 
 
 ### Configuration in application.conf
-Default values are defined in 
[here](cloudant-spark-sql/src/main/resources/application.conf).
+Default values are defined in [here](src/main/resources/application.conf).
 
 ### Configuration on SparkConf
 
 Name | Default | Meaning
 --- |:---:| ---
+cloudant.apiReceiver|"_all_docs"| API endpoint for RelationProvider when 
loading or saving data from Cloudant to DataFrames or SQL temporary tables. 
Select between "_all_docs" or "_changes" endpoint.
 cloudant.protocol|https|protocol to use to transfer data: http or https
-cloudant.host||cloudant host url
-cloudant.username||cloudant userid
-cloudant.password||cloudant password
+cloudant.host| |cloudant host url
+cloudant.username| |cloudant userid
+cloudant.password| |cloudant password
 cloudant.useQuery|false|By default, _all_docs endpoint is used if 
configuration 'view' and 'index' (see below) are not set. When useQuery is 
enabled, _find endpoint will be used in place of _all_docs when query condition 
is not on primary key field (_id), so that query predicates may be driven into 
datastore. 
 cloudant.queryLimit|25|The maximum number of results returned when 
querying the _find endpoint.
 jsonstore.rdd.partitions|10|the number of partitions intent used to drive 
JsonStoreRDD loading query result in parallel. The actual number is calculated 
based on total rows returned and satisfying maxInPartition and minInPartition
 jsonstore.rdd.maxInPartition|-1|the max rows in a partition. -1 means 
unlimited
 jsonstore.rdd.minInPartition|10|the min rows in a partition.
 jsonstore.rdd.requestTimeout|90| the request timeout in milliseconds
 bulkSize|200| the bulk save size
-schemaSampleSize| "-1" | the sample size for RDD schema discovery. 1 means 
we are using only first document for schema discovery; -1 means all documents; 
0 will be treated as 1; any number N means min(N, total) docs 
-createDBOnSave|"false"| whether to create a new database during save 
operation. If false, a database should already exist. If true, a new database 
will be created. If true, and a database with a provided name already exists, 
an error will be raised. 
+schemaSampleSize|-1| the sample size for RDD schema discovery. 1 means we 
are using only first document for schema discovery; -1 means all documents; 0 
will be treated as 1; any number N means min(N, total) docs 
--- End diff --

using only **the** first document


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---