[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/61#discussion_r160470451 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala --- @@ -16,34 +16,127 @@ */ package org.apache.bahir.cloudant -import java.net.URLEncoder +import java.net.{URL, URLEncoder} -import play.api.libs.json.{JsArray, JsObject, Json, JsValue} +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.io.File + +import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database} +import com.cloudant.client.api.model.SearchResult +import com.cloudant.client.api.views._ +import com.cloudant.http.{Http, HttpConnection} +import com.cloudant.http.interceptors.Replay429Interceptor +import com.google.gson.{JsonObject, JsonParser} import org.apache.bahir.cloudant.common._ +import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter /* * Only allow one field pushdown now * as the filter today does not tell how to link the filters out And v.s. Or */ class CloudantConfig(val protocol: String, val host: String, - val dbName: String, val indexName: String, val viewName: String) + val dbName: String, val indexPath: String, val viewPath: String) (implicit val username: String, val password: String, val partitions: Int, val maxInPartition: Int, val minInPartition: Int, val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int, val createDBOnSave: Boolean, val endpoint: String, val useQuery: Boolean = false, val queryLimit: Int) extends Serializable { + @transient private lazy val client: CloudantClient = ClientBuilder +.url(getClientUrl) +.username(username) +.password(password) +.interceptors(Replay429Interceptor.WITH_DEFAULTS) --- End diff -- Also the defaults may be insufficient for some use cases. It may be worth exposing config option(s) for at least the number of retries if not the backoff. ---
[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/61#discussion_r159845769 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala --- @@ -16,34 +16,127 @@ */ package org.apache.bahir.cloudant -import java.net.URLEncoder +import java.net.{URL, URLEncoder} -import play.api.libs.json.{JsArray, JsObject, Json, JsValue} +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.io.File + +import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database} +import com.cloudant.client.api.model.SearchResult +import com.cloudant.client.api.views._ +import com.cloudant.http.{Http, HttpConnection} +import com.cloudant.http.interceptors.Replay429Interceptor +import com.google.gson.{JsonObject, JsonParser} import org.apache.bahir.cloudant.common._ +import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter /* * Only allow one field pushdown now * as the filter today does not tell how to link the filters out And v.s. Or */ class CloudantConfig(val protocol: String, val host: String, - val dbName: String, val indexName: String, val viewName: String) + val dbName: String, val indexPath: String, val viewPath: String) (implicit val username: String, val password: String, val partitions: Int, val maxInPartition: Int, val minInPartition: Int, val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int, val createDBOnSave: Boolean, val endpoint: String, val useQuery: Boolean = false, val queryLimit: Int) extends Serializable { + @transient private lazy val client: CloudantClient = ClientBuilder +.url(getClientUrl) +.username(username) +.password(password) +.interceptors(Replay429Interceptor.WITH_DEFAULTS) +.build + @transient private lazy val database: Database = client.database(dbName, false) lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName} + lazy val designDoc: String = { +if (viewPath != null && viewPath.nonEmpty) { + viewPath.split("/")(1) +} else { +null +} + } + lazy val searchName: String = { +// verify that the index path matches '_design/ddoc/_search/searchname' +if (indexPath != null && indexPath.nonEmpty && indexPath.matches("\\w+\\/\\w+\\/\\w+\\/\\w+")) { + val splitPath = indexPath.split(File.separator) + // return 'design-doc/search-name' + splitPath(1) + File.separator + splitPath(3) +} else { + null +} + } + lazy val viewName: String = { +if (viewPath != null && viewPath.nonEmpty) { + val splitViewPath = viewPath.split(File.separator) + if(splitViewPath(3).contains("?")) { +splitViewPath(3).substring(0, splitViewPath(3).indexOf("?")) + } else { +splitViewPath(3) + } +} else { + null +} + } val pkField = "_id" val defaultIndex: String = endpoint val default_filter: String = "*:*" - def getDbUrl: String = { -dbUrl + def buildAllDocsRequest(limit: Int, includeDocs: Boolean = true): AllDocsRequestBuilder = { +var allDocsReq = database.getAllDocsRequestBuilder.includeDocs(includeDocs) +if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) { + allDocsReq = allDocsReq.limit(limit) +} +allDocsReq + } + + def buildViewRequest(limit: Int, includeDocs: Boolean = true): + UnpaginatedRequestBuilder[String, String] = { +val viewReq = database.getViewRequestBuilder(designDoc, viewName) + .newRequest(Key.Type.STRING, classOf[String]) + .includeDocs(includeDocs) +if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) { + viewReq.limit(limit) +} +viewReq + } + + def buildSearchRequest(limit: Int): SearchResult[JsonObject] = { +val searchReq = database.search(searchName) +if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) { + searchReq.limit(limit) +} +searchReq.querySearchResult(default_filter, classOf[JsonObject]) + } + + def executeRequest(stringUrl: String, postData: String = null): HttpConnection = { +val url = new URL(stringUrl) +if(postData != null) { + val conn = Http.POST(url, "application/json") + conn.setRequestBody(postData)
[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/61#discussion_r159889787 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala --- @@ -191,20 +199,30 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig) (0 until totalPartition).map(i => { val skip = i * limitPerPartition new JsonStoreRDDPartition(url, skip, limitPerPartition, i, - config, selector, fields, queryUsed).asInstanceOf[Partition] + config, selector, fields, queryUsed) +.asInstanceOf[Partition] }).toArray } override def compute(splitIn: Partition, context: TaskContext): Iterator[String] = { val myPartition = splitIn.asInstanceOf[JsonStoreRDDPartition] implicit val postData : String = { + val jsonObject = new JsonObject if (myPartition.queryUsed && myPartition.fields != null) { -Json.stringify(Json.obj("selector" -> myPartition.selector, "fields" -> myPartition.fields, -"limit" -> myPartition.limit, "skip" -> myPartition.skip)) +// Json.stringify(Json.obj("selector" -> myPartition.selector, "fields" -> +// myPartition.fields, "limit" -> myPartition.limit, "skip" -> myPartition.skip)) --- End diff -- remove ---
[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/61#discussion_r159845032 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala --- @@ -16,34 +16,127 @@ */ package org.apache.bahir.cloudant -import java.net.URLEncoder +import java.net.{URL, URLEncoder} -import play.api.libs.json.{JsArray, JsObject, Json, JsValue} +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.io.File + +import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database} +import com.cloudant.client.api.model.SearchResult +import com.cloudant.client.api.views._ +import com.cloudant.http.{Http, HttpConnection} +import com.cloudant.http.interceptors.Replay429Interceptor +import com.google.gson.{JsonObject, JsonParser} import org.apache.bahir.cloudant.common._ +import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter /* * Only allow one field pushdown now * as the filter today does not tell how to link the filters out And v.s. Or */ class CloudantConfig(val protocol: String, val host: String, - val dbName: String, val indexName: String, val viewName: String) + val dbName: String, val indexPath: String, val viewPath: String) (implicit val username: String, val password: String, val partitions: Int, val maxInPartition: Int, val minInPartition: Int, val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int, val createDBOnSave: Boolean, val endpoint: String, val useQuery: Boolean = false, val queryLimit: Int) extends Serializable { + @transient private lazy val client: CloudantClient = ClientBuilder +.url(getClientUrl) +.username(username) +.password(password) +.interceptors(Replay429Interceptor.WITH_DEFAULTS) --- End diff -- It may be worth adding an additional interceptor here to change/augment the UA. ---
[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/61#discussion_r159847773 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala --- @@ -54,20 +147,21 @@ class CloudantConfig(val protocol: String, val host: String, createDBOnSave } - def getLastNum(result: JsValue): JsValue = (result \ "last_seq").get + def getClientUrl: URL = { +new URL(protocol + "://" + host) + } + + def getLastNum(result: JsonObject): JsonObject = result.get("last_seq").getAsJsonObject /* Url containing limit for docs in a Cloudant database. * If a view is not defined, use the _all_docs endpoint. * @return url with one doc limit for retrieving total doc count */ def getUrl(limit: Int, excludeDDoc: Boolean = false): String = { --- End diff -- The comment suggests that this method is used to get a URL with a `limit=1` for the purposes of getting a doc count, but then it is not used by `getTotalDocCount` it does however appear to be used by `getMany` with a different limit for query results. FWIW I can't see `getTotalDocCount` being called anywhere in the new code either. ---
[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/61#discussion_r159845874 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala --- @@ -16,34 +16,127 @@ */ package org.apache.bahir.cloudant -import java.net.URLEncoder +import java.net.{URL, URLEncoder} -import play.api.libs.json.{JsArray, JsObject, Json, JsValue} +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.io.File + +import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database} +import com.cloudant.client.api.model.SearchResult +import com.cloudant.client.api.views._ +import com.cloudant.http.{Http, HttpConnection} +import com.cloudant.http.interceptors.Replay429Interceptor +import com.google.gson.{JsonObject, JsonParser} import org.apache.bahir.cloudant.common._ +import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter /* * Only allow one field pushdown now * as the filter today does not tell how to link the filters out And v.s. Or */ class CloudantConfig(val protocol: String, val host: String, - val dbName: String, val indexName: String, val viewName: String) + val dbName: String, val indexPath: String, val viewPath: String) (implicit val username: String, val password: String, val partitions: Int, val maxInPartition: Int, val minInPartition: Int, val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int, val createDBOnSave: Boolean, val endpoint: String, val useQuery: Boolean = false, val queryLimit: Int) extends Serializable { + @transient private lazy val client: CloudantClient = ClientBuilder +.url(getClientUrl) +.username(username) +.password(password) +.interceptors(Replay429Interceptor.WITH_DEFAULTS) +.build + @transient private lazy val database: Database = client.database(dbName, false) lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName} + lazy val designDoc: String = { +if (viewPath != null && viewPath.nonEmpty) { + viewPath.split("/")(1) +} else { +null +} + } + lazy val searchName: String = { +// verify that the index path matches '_design/ddoc/_search/searchname' +if (indexPath != null && indexPath.nonEmpty && indexPath.matches("\\w+\\/\\w+\\/\\w+\\/\\w+")) { + val splitPath = indexPath.split(File.separator) + // return 'design-doc/search-name' + splitPath(1) + File.separator + splitPath(3) +} else { + null +} + } + lazy val viewName: String = { +if (viewPath != null && viewPath.nonEmpty) { + val splitViewPath = viewPath.split(File.separator) + if(splitViewPath(3).contains("?")) { +splitViewPath(3).substring(0, splitViewPath(3).indexOf("?")) + } else { +splitViewPath(3) + } +} else { + null +} + } val pkField = "_id" val defaultIndex: String = endpoint val default_filter: String = "*:*" - def getDbUrl: String = { -dbUrl + def buildAllDocsRequest(limit: Int, includeDocs: Boolean = true): AllDocsRequestBuilder = { +var allDocsReq = database.getAllDocsRequestBuilder.includeDocs(includeDocs) +if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) { + allDocsReq = allDocsReq.limit(limit) +} +allDocsReq + } + + def buildViewRequest(limit: Int, includeDocs: Boolean = true): + UnpaginatedRequestBuilder[String, String] = { +val viewReq = database.getViewRequestBuilder(designDoc, viewName) + .newRequest(Key.Type.STRING, classOf[String]) + .includeDocs(includeDocs) +if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) { + viewReq.limit(limit) +} +viewReq + } + + def buildSearchRequest(limit: Int): SearchResult[JsonObject] = { +val searchReq = database.search(searchName) +if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) { + searchReq.limit(limit) +} +searchReq.querySearchResult(default_filter, classOf[JsonObject]) + } + + def executeRequest(stringUrl: String, postData: String = null): HttpConnection = { +val url = new URL(stringUrl) +if(postData != null) { + val conn = Http.POST(url, "application/json") + conn.setRequestBody(postData)
[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/61#discussion_r159890761 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala --- @@ -191,20 +199,30 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig) (0 until totalPartition).map(i => { val skip = i * limitPerPartition new JsonStoreRDDPartition(url, skip, limitPerPartition, i, - config, selector, fields, queryUsed).asInstanceOf[Partition] + config, selector, fields, queryUsed) +.asInstanceOf[Partition] }).toArray } override def compute(splitIn: Partition, context: TaskContext): Iterator[String] = { val myPartition = splitIn.asInstanceOf[JsonStoreRDDPartition] implicit val postData : String = { + val jsonObject = new JsonObject if (myPartition.queryUsed && myPartition.fields != null) { -Json.stringify(Json.obj("selector" -> myPartition.selector, "fields" -> myPartition.fields, -"limit" -> myPartition.limit, "skip" -> myPartition.skip)) +// Json.stringify(Json.obj("selector" -> myPartition.selector, "fields" -> +// myPartition.fields, "limit" -> myPartition.limit, "skip" -> myPartition.skip)) +jsonObject.add("selector", myPartition.selector) +jsonObject.add("fields", myPartition.fields) +jsonObject.addProperty("skip", myPartition.skip) +jsonObject.toString } else if (myPartition.queryUsed) { -Json.stringify(Json.obj("selector" -> myPartition.selector, "limit" -> myPartition.limit, -"skip" -> myPartition.skip)) +// Json.stringify(Json.obj("selector" -> myPartition.selector, +// "limit" -> myPartition.limit, "skip" -> myPartition.skip)) +jsonObject.add("selector", myPartition.selector) --- End diff -- Could add the `selector` and `skip` before the `if`/`else` and avoid the duplication ---
[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/61#discussion_r159889840 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala --- @@ -191,20 +199,30 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig) (0 until totalPartition).map(i => { val skip = i * limitPerPartition new JsonStoreRDDPartition(url, skip, limitPerPartition, i, - config, selector, fields, queryUsed).asInstanceOf[Partition] + config, selector, fields, queryUsed) +.asInstanceOf[Partition] }).toArray } override def compute(splitIn: Partition, context: TaskContext): Iterator[String] = { val myPartition = splitIn.asInstanceOf[JsonStoreRDDPartition] implicit val postData : String = { + val jsonObject = new JsonObject if (myPartition.queryUsed && myPartition.fields != null) { -Json.stringify(Json.obj("selector" -> myPartition.selector, "fields" -> myPartition.fields, -"limit" -> myPartition.limit, "skip" -> myPartition.skip)) +// Json.stringify(Json.obj("selector" -> myPartition.selector, "fields" -> +// myPartition.fields, "limit" -> myPartition.limit, "skip" -> myPartition.skip)) +jsonObject.add("selector", myPartition.selector) +jsonObject.add("fields", myPartition.fields) +jsonObject.addProperty("skip", myPartition.skip) +jsonObject.toString } else if (myPartition.queryUsed) { -Json.stringify(Json.obj("selector" -> myPartition.selector, "limit" -> myPartition.limit, -"skip" -> myPartition.skip)) +// Json.stringify(Json.obj("selector" -> myPartition.selector, +// "limit" -> myPartition.limit, "skip" -> myPartition.skip)) --- End diff -- remove ---
[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/61#discussion_r159845855 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala --- @@ -16,34 +16,127 @@ */ package org.apache.bahir.cloudant -import java.net.URLEncoder +import java.net.{URL, URLEncoder} -import play.api.libs.json.{JsArray, JsObject, Json, JsValue} +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.io.File + +import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database} +import com.cloudant.client.api.model.SearchResult +import com.cloudant.client.api.views._ +import com.cloudant.http.{Http, HttpConnection} +import com.cloudant.http.interceptors.Replay429Interceptor +import com.google.gson.{JsonObject, JsonParser} import org.apache.bahir.cloudant.common._ +import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter /* * Only allow one field pushdown now * as the filter today does not tell how to link the filters out And v.s. Or */ class CloudantConfig(val protocol: String, val host: String, - val dbName: String, val indexName: String, val viewName: String) + val dbName: String, val indexPath: String, val viewPath: String) (implicit val username: String, val password: String, val partitions: Int, val maxInPartition: Int, val minInPartition: Int, val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int, val createDBOnSave: Boolean, val endpoint: String, val useQuery: Boolean = false, val queryLimit: Int) extends Serializable { + @transient private lazy val client: CloudantClient = ClientBuilder +.url(getClientUrl) +.username(username) +.password(password) +.interceptors(Replay429Interceptor.WITH_DEFAULTS) +.build + @transient private lazy val database: Database = client.database(dbName, false) lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName} + lazy val designDoc: String = { +if (viewPath != null && viewPath.nonEmpty) { + viewPath.split("/")(1) +} else { +null +} + } + lazy val searchName: String = { +// verify that the index path matches '_design/ddoc/_search/searchname' +if (indexPath != null && indexPath.nonEmpty && indexPath.matches("\\w+\\/\\w+\\/\\w+\\/\\w+")) { + val splitPath = indexPath.split(File.separator) + // return 'design-doc/search-name' + splitPath(1) + File.separator + splitPath(3) +} else { + null +} + } + lazy val viewName: String = { +if (viewPath != null && viewPath.nonEmpty) { + val splitViewPath = viewPath.split(File.separator) + if(splitViewPath(3).contains("?")) { +splitViewPath(3).substring(0, splitViewPath(3).indexOf("?")) + } else { +splitViewPath(3) + } +} else { + null +} + } val pkField = "_id" val defaultIndex: String = endpoint val default_filter: String = "*:*" - def getDbUrl: String = { -dbUrl + def buildAllDocsRequest(limit: Int, includeDocs: Boolean = true): AllDocsRequestBuilder = { +var allDocsReq = database.getAllDocsRequestBuilder.includeDocs(includeDocs) +if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) { + allDocsReq = allDocsReq.limit(limit) +} +allDocsReq + } + + def buildViewRequest(limit: Int, includeDocs: Boolean = true): + UnpaginatedRequestBuilder[String, String] = { +val viewReq = database.getViewRequestBuilder(designDoc, viewName) + .newRequest(Key.Type.STRING, classOf[String]) + .includeDocs(includeDocs) +if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) { + viewReq.limit(limit) +} +viewReq + } + + def buildSearchRequest(limit: Int): SearchResult[JsonObject] = { +val searchReq = database.search(searchName) +if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) { + searchReq.limit(limit) +} +searchReq.querySearchResult(default_filter, classOf[JsonObject]) + } + + def executeRequest(stringUrl: String, postData: String = null): HttpConnection = { +val url = new URL(stringUrl) +if(postData != null) { + val conn = Http.POST(url, "application/json") + conn.setRequestBody(postData)
[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/61#discussion_r159889644 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala --- @@ -174,7 +178,11 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig) implicit val postData : String = { if (queryUsed) { -Json.stringify(Json.obj("selector" -> selector, "limit" -> 1)) +val jsonSelector = new JsonObject +jsonSelector.addProperty("selector", selector.toString) +jsonSelector.addProperty("limit", 1) +jsonSelector.toString +// Json.stringify(Json.obj("selector" -> selector, "limit" -> 1)) --- End diff -- remove ---
[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/61#discussion_r159889542 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala --- @@ -101,24 +105,24 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig) case LessThanOrEqual(attr, v) => ("$lte", v) case _ => (null, null) } -val convertedV: JsValue = { +val convertedV: JsonElement = { // TODO Better handing of other types if (value != null) { value match { - case s: String => Json.toJson(s) - case l: Long => Json.toJson(l) - case d: Double => Json.toJson(d) - case i: Int => Json.toJson(i) - case b: Boolean => Json.toJson(b) - case t: java.sql.Timestamp => Json.toJson(t) + case s: String => parser.parse(s)// Json.toJson(s) --- End diff -- some old code here that should probably be removed (and on the following line) ---
[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/61#discussion_r159845718 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala --- @@ -16,34 +16,127 @@ */ package org.apache.bahir.cloudant -import java.net.URLEncoder +import java.net.{URL, URLEncoder} -import play.api.libs.json.{JsArray, JsObject, Json, JsValue} +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.io.File + +import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database} +import com.cloudant.client.api.model.SearchResult +import com.cloudant.client.api.views._ +import com.cloudant.http.{Http, HttpConnection} +import com.cloudant.http.interceptors.Replay429Interceptor +import com.google.gson.{JsonObject, JsonParser} import org.apache.bahir.cloudant.common._ +import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter /* * Only allow one field pushdown now * as the filter today does not tell how to link the filters out And v.s. Or */ class CloudantConfig(val protocol: String, val host: String, - val dbName: String, val indexName: String, val viewName: String) + val dbName: String, val indexPath: String, val viewPath: String) (implicit val username: String, val password: String, val partitions: Int, val maxInPartition: Int, val minInPartition: Int, val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int, val createDBOnSave: Boolean, val endpoint: String, val useQuery: Boolean = false, val queryLimit: Int) extends Serializable { + @transient private lazy val client: CloudantClient = ClientBuilder +.url(getClientUrl) +.username(username) +.password(password) +.interceptors(Replay429Interceptor.WITH_DEFAULTS) +.build + @transient private lazy val database: Database = client.database(dbName, false) lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName} + lazy val designDoc: String = { +if (viewPath != null && viewPath.nonEmpty) { + viewPath.split("/")(1) +} else { +null +} + } + lazy val searchName: String = { +// verify that the index path matches '_design/ddoc/_search/searchname' +if (indexPath != null && indexPath.nonEmpty && indexPath.matches("\\w+\\/\\w+\\/\\w+\\/\\w+")) { + val splitPath = indexPath.split(File.separator) + // return 'design-doc/search-name' + splitPath(1) + File.separator + splitPath(3) +} else { + null +} + } + lazy val viewName: String = { +if (viewPath != null && viewPath.nonEmpty) { + val splitViewPath = viewPath.split(File.separator) + if(splitViewPath(3).contains("?")) { +splitViewPath(3).substring(0, splitViewPath(3).indexOf("?")) + } else { +splitViewPath(3) + } +} else { + null +} + } val pkField = "_id" val defaultIndex: String = endpoint val default_filter: String = "*:*" - def getDbUrl: String = { -dbUrl + def buildAllDocsRequest(limit: Int, includeDocs: Boolean = true): AllDocsRequestBuilder = { +var allDocsReq = database.getAllDocsRequestBuilder.includeDocs(includeDocs) +if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) { + allDocsReq = allDocsReq.limit(limit) +} +allDocsReq + } + + def buildViewRequest(limit: Int, includeDocs: Boolean = true): + UnpaginatedRequestBuilder[String, String] = { +val viewReq = database.getViewRequestBuilder(designDoc, viewName) + .newRequest(Key.Type.STRING, classOf[String]) + .includeDocs(includeDocs) +if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) { + viewReq.limit(limit) +} +viewReq + } + + def buildSearchRequest(limit: Int): SearchResult[JsonObject] = { +val searchReq = database.search(searchName) +if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) { + searchReq.limit(limit) +} +searchReq.querySearchResult(default_filter, classOf[JsonObject]) + } + + def executeRequest(stringUrl: String, postData: String = null): HttpConnection = { +val url = new URL(stringUrl) +if(postData != null) { + val conn = Http.POST(url, "application/json") + conn.setRequestBody(postData)
[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/61#discussion_r159848274 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala --- @@ -95,15 +189,49 @@ class CloudantConfig(val protocol: String, val host: String, } } + def getTotalDocCount: Int = { +val limit = 1 +if (viewPath != null) { + // "limit=" + limit + "=" + skip + buildViewRequest(limit, includeDocs = false).build().getResponse.getTotalRowCount.toInt +} else { + // /_all_docs?limit=1 + // Note: java-cloudant's AllDocsRequest doesn't have a getTotalRowCount method + // buildAllDocsRequest(1, includeDocs = false).build().getResponse.getTotalRowCount.toInt + val response = client.executeRequest(Http.GET( +new URL(database.getDBUri + File.separator + endpoint + "?limit=" + limit))) + getResultTotalRows(response.responseAsString) +} + } + + def getDocs(limit: Int): List[JsonObject] = { +if (viewPath != null) { + // "limit=" + limit + "=" + skip --- End diff -- remove? ---
[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/61#discussion_r159709935 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala --- @@ -95,15 +189,49 @@ class CloudantConfig(val protocol: String, val host: String, } } + def getTotalDocCount: Int = { +val limit = 1 +if (viewPath != null) { + // "limit=" + limit + "=" + skip + buildViewRequest(limit, includeDocs = false).build().getResponse.getTotalRowCount.toInt +} else { + // /_all_docs?limit=1 + // Note: java-cloudant's AllDocsRequest doesn't have a getTotalRowCount method + // buildAllDocsRequest(1, includeDocs = false).build().getResponse.getTotalRowCount.toInt + val response = client.executeRequest(Http.GET( +new URL(database.getDBUri + File.separator + endpoint + "?limit=" + limit))) + getResultTotalRows(response.responseAsString) +} + } + + def getDocs(limit: Int): List[JsonObject] = { +if (viewPath != null) { + // "limit=" + limit + "=" + skip + buildViewRequest(limit).build().getResponse.getDocsAs(classOf[JsonObject]).asScala.toList +} else if (indexPath != null) { + var searchDocs = mutable.ListBuffer[JsonObject]() + for (result: SearchResult[JsonObject]#SearchResultRow <- + buildSearchRequest(limit).getRows.asScala) { +searchDocs += result.getDoc + } + searchDocs.toList +} else { + // /_all_docs?limit=1 + // val response = client.executeRequest(Http.GET( --- End diff -- Remove commented out code? ---
[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/61#discussion_r159699798 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala --- @@ -16,34 +16,127 @@ */ package org.apache.bahir.cloudant -import java.net.URLEncoder +import java.net.{URL, URLEncoder} -import play.api.libs.json.{JsArray, JsObject, Json, JsValue} +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.io.File + +import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database} +import com.cloudant.client.api.model.SearchResult +import com.cloudant.client.api.views._ +import com.cloudant.http.{Http, HttpConnection} +import com.cloudant.http.interceptors.Replay429Interceptor +import com.google.gson.{JsonObject, JsonParser} import org.apache.bahir.cloudant.common._ +import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter /* * Only allow one field pushdown now * as the filter today does not tell how to link the filters out And v.s. Or */ class CloudantConfig(val protocol: String, val host: String, - val dbName: String, val indexName: String, val viewName: String) + val dbName: String, val indexPath: String, val viewPath: String) (implicit val username: String, val password: String, val partitions: Int, val maxInPartition: Int, val minInPartition: Int, val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int, val createDBOnSave: Boolean, val endpoint: String, val useQuery: Boolean = false, val queryLimit: Int) extends Serializable { + @transient private lazy val client: CloudantClient = ClientBuilder +.url(getClientUrl) +.username(username) +.password(password) +.interceptors(Replay429Interceptor.WITH_DEFAULTS) +.build + @transient private lazy val database: Database = client.database(dbName, false) lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName} + lazy val designDoc: String = { +if (viewPath != null && viewPath.nonEmpty) { + viewPath.split("/")(1) +} else { +null +} + } + lazy val searchName: String = { +// verify that the index path matches '_design/ddoc/_search/searchname' +if (indexPath != null && indexPath.nonEmpty && indexPath.matches("\\w+\\/\\w+\\/\\w+\\/\\w+")) { + val splitPath = indexPath.split(File.separator) --- End diff -- If you separate out the regex pattern from earlier you could use the captured groups to extract the design doc ID and search index name without needing to do more splits here. ---
[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/61#discussion_r159709382 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala --- @@ -95,15 +189,49 @@ class CloudantConfig(val protocol: String, val host: String, } } + def getTotalDocCount: Int = { +val limit = 1 +if (viewPath != null) { + // "limit=" + limit + "=" + skip + buildViewRequest(limit, includeDocs = false).build().getResponse.getTotalRowCount.toInt +} else { + // /_all_docs?limit=1 + // Note: java-cloudant's AllDocsRequest doesn't have a getTotalRowCount method + // buildAllDocsRequest(1, includeDocs = false).build().getResponse.getTotalRowCount.toInt + val response = client.executeRequest(Http.GET( +new URL(database.getDBUri + File.separator + endpoint + "?limit=" + limit))) + getResultTotalRows(response.responseAsString) --- End diff -- It might be easier to use `com.cloudant.client.api.model.DbInfo#getDocCount` instead of trying to do it via `_all_docs` ---
[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/61#discussion_r159699042 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala --- @@ -16,34 +16,127 @@ */ package org.apache.bahir.cloudant -import java.net.URLEncoder +import java.net.{URL, URLEncoder} -import play.api.libs.json.{JsArray, JsObject, Json, JsValue} +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.io.File + +import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database} +import com.cloudant.client.api.model.SearchResult +import com.cloudant.client.api.views._ +import com.cloudant.http.{Http, HttpConnection} +import com.cloudant.http.interceptors.Replay429Interceptor +import com.google.gson.{JsonObject, JsonParser} import org.apache.bahir.cloudant.common._ +import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter /* * Only allow one field pushdown now * as the filter today does not tell how to link the filters out And v.s. Or */ class CloudantConfig(val protocol: String, val host: String, - val dbName: String, val indexName: String, val viewName: String) + val dbName: String, val indexPath: String, val viewPath: String) (implicit val username: String, val password: String, val partitions: Int, val maxInPartition: Int, val minInPartition: Int, val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int, val createDBOnSave: Boolean, val endpoint: String, val useQuery: Boolean = false, val queryLimit: Int) extends Serializable { + @transient private lazy val client: CloudantClient = ClientBuilder +.url(getClientUrl) +.username(username) +.password(password) +.interceptors(Replay429Interceptor.WITH_DEFAULTS) +.build + @transient private lazy val database: Database = client.database(dbName, false) lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName} + lazy val designDoc: String = { +if (viewPath != null && viewPath.nonEmpty) { + viewPath.split("/")(1) +} else { +null +} + } + lazy val searchName: String = { +// verify that the index path matches '_design/ddoc/_search/searchname' +if (indexPath != null && indexPath.nonEmpty && indexPath.matches("\\w+\\/\\w+\\/\\w+\\/\\w+")) { --- End diff -- I think you could leverage scala's raw interpreter here to make the regex a bit less escape-y: `raw"\w+\/\w+\/\w+\/\w+"` Also are word characters sufficient here? Aren't there some other characters that could be part of a design document id and allowed unencoded in a URL path (e.g. maybe `@`?) that would be excluded by this regex? I'd be inclined to maybe use something like: `_design\/(^\/)+\/_search\/(^\/)+` ---
[GitHub] bahir pull request #57: [BAHIR-128][WIP] fix failing sql-cloudant test
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/57#discussion_r157480164 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala --- @@ -125,7 +125,7 @@ class DefaultSource extends RelationProvider /* Create a streaming context to handle transforming docs in * larger databases into Spark datasets */ - val ssc = new StreamingContext(sqlContext.sparkContext, Seconds(10)) + val ssc = new StreamingContext(sqlContext.sparkContext, Seconds(8)) --- End diff -- I'm amazed this parameter was hard-coded as it seems to be a fairly critical tuning parameter for streaming. I guess this is one for another PR. ---
[GitHub] bahir pull request #57: [BAHIR-128][WIP] fix failing sql-cloudant test
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/57#discussion_r157484722 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala --- @@ -39,56 +37,38 @@ class ChangesReceiver(config: CloudantChangesConfig) } private def receive(): Unit = { -// Get total number of docs in database using _all_docs endpoint -val limit = new JsonStoreDataAccess(config) - .getTotalRows(config.getTotalUrl, queryUsed = false) - -// Get continuous _changes url +// Get normal _changes url --- End diff -- I'm a bit confused about this change. Since Spark Streaming is the basis for "real-time" or "continuous applications" doesn't this need to keep listening to the changes feed to wait for more changes? ---
[GitHub] bahir pull request #57: [BAHIR-128][WIP] fix failing sql-cloudant test
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/57#discussion_r157478713 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala --- @@ -67,7 +67,8 @@ class CloudantChangesConfig(protocol: String, host: String, dbName: String, } def getChangesReceiverUrl: String = { -var url = dbUrl + "/" + defaultIndex + "?include_docs=true=continuous=" + timeout +var url = dbUrl + "/" + defaultIndex + "?include_docs=true=normal" + + "_interval=1=" + timeout --- End diff -- WDYT about making the `seq_interval` the `bulkSize` instead of hard-coding? ---
[GitHub] bahir pull request #57: [BAHIR-128][WIP] fix failing sql-cloudant test
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/57#discussion_r157479639 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala --- @@ -39,56 +37,38 @@ class ChangesReceiver(config: CloudantChangesConfig) } private def receive(): Unit = { -// Get total number of docs in database using _all_docs endpoint -val limit = new JsonStoreDataAccess(config) - .getTotalRows(config.getTotalUrl, queryUsed = false) - -// Get continuous _changes url +// Get normal _changes url val url = config.getChangesReceiverUrl.toString val selector: String = { "{\"selector\":" + config.getSelector + "}" } -var count = 0 +// var count = 0 --- End diff -- delete? ---
[GitHub] bahir-website pull request #13: Fixed typos in Apache CouchDB
GitHub user ricellis opened a pull request: https://github.com/apache/bahir-website/pull/13 Fixed typos in Apache CouchDB Apache CouchDB is incorrectly referred to as `CounchDB` twice on the website. This PR corrects those typos. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ricellis/bahir-website couchdb-typos Alternatively you can review and apply these changes as the patch at: https://github.com/apache/bahir-website/pull/13.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #13 commit b18d0ed860e53df588f09237deed30805b9d1e49 Author: Rich Ellis <ricel...@users.noreply.github.com> Date: 2017-11-17T09:25:50Z Fixed typos in Apache CouchDB ---
[GitHub] bahir pull request #45: [WIP] [BAHIR-110] Implement _changes API for non-str...
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/45#discussion_r126401728 --- Diff: sql-cloudant/README.md --- @@ -31,15 +31,14 @@ The `--packages` argument can also be used with `bin/spark-submit`. Submit a job in Python: -spark-submit --master local[4] --jars +spark-submit --master local[4] --packages org.apache.bahir:spark-sql-cloudant_2.11:2.2.0-SNAPSHOT Submit a job in Scala: - spark-submit --class "" --master local[4] --jars + spark-submit --class "" --master local[4] --packages org.apache.bahir:spark-sql-cloudant_2.11:2.2.0-SNAPSHOT --- End diff -- `SNAPSHOT` again? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] bahir pull request #45: [WIP] [BAHIR-110] Implement _changes API for non-str...
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/45#discussion_r126402346 --- Diff: sql-cloudant/README.md --- @@ -52,39 +51,71 @@ Here each subsequent configuration overrides the previous one. Thus, configurati ### Configuration in application.conf -Default values are defined in [here](cloudant-spark-sql/src/main/resources/application.conf). +Default values are defined in [here](src/main/resources/application.conf). ### Configuration on SparkConf Name | Default | Meaning --- |:---:| --- +cloudant.apiReceiver|"_all_docs"| API endpoint for RelationProvider when loading or saving data from Cloudant to DataFrames or SQL temporary tables. Select between "_all_docs" or "_changes" endpoint. --- End diff -- I would say >Cloudant API endpoint to use for or >Select between the Cloudant `_all_docs` or `_changes` endpoint Probably also worth a "see below for details|notes|further explanation" or similar to help people find the content that would help them choose the appropriate endpoint. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] bahir pull request #45: [WIP] [BAHIR-110] Implement _changes API for non-str...
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/45#discussion_r126438628 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala --- @@ -98,29 +99,89 @@ class DefaultSource extends RelationProvider val config: CloudantConfig = JsonStoreConfigManager.getConfig(sqlContext, parameters) - var allDocsDF: DataFrame = null + var dataFrame: DataFrame = null val schema: StructType = { if (inSchema != null) { inSchema -} else { - val df = if (config.getSchemaSampleSize() == -JsonStoreConfigManager.ALL_DOCS_LIMIT && +} else if (!config.isInstanceOf[CloudantChangesConfig] + || config.viewName != null || config.indexName != null) { + val df = if (config.getSchemaSampleSize == +JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT && config.viewName == null && config.indexName == null) { val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config) -allDocsDF = sqlContext.read.json(cloudantRDD) -allDocsDF +dataFrame = sqlContext.read.json(cloudantRDD) +dataFrame } else { val dataAccess = new JsonStoreDataAccess(config) val aRDD = sqlContext.sparkContext.parallelize( -dataAccess.getMany(config.getSchemaSampleSize())) +dataAccess.getMany(config.getSchemaSampleSize)) sqlContext.read.json(aRDD) } df.schema +} else { + /* Create a streaming context to handle transforming docs in + * larger databases into Spark datasets + */ + /* Allow the raw data and persisted RDDs to be accessible outside + * of the streaming context. + * See https://spark.apache.org/docs/latest/configuration.html + * for more details. + */ + sqlContext.sparkSession.conf.set("spark.streaming.unpersist", "false") + + val ssc = new StreamingContext(sqlContext.sparkContext, Seconds(10)) + val streamingMap = { +val selector = config.asInstanceOf[CloudantChangesConfig].getSelector +if (selector != null) { + Map( +"database" -> config.getDbname, +"selector" -> selector + ) +} else { + Map( +"database" -> config.getDbname + ) +} + } + + val changes = ssc.receiverStream( +new CloudantReceiver(sqlContext.sparkContext.getConf, streamingMap)) + changes.persist(config.asInstanceOf[CloudantChangesConfig] +.getStorageLevelForStreaming) + + // Global RDD that's created from union of all RDDs + var globalRDD = ssc.sparkContext.emptyRDD[String] + + logger.info("Loading data from Cloudant using " ++ config.asInstanceOf[CloudantChangesConfig].getContinuousChangesUrl) + + // Collect and union each RDD to convert all RDDs to a DataFrame + changes.foreachRDD((rdd: RDD[String]) => { +if (!rdd.isEmpty()) { + if (globalRDD != null) { +// Union RDDs in foreach loop +globalRDD = globalRDD.union(rdd) + } else { +globalRDD = rdd + } +} else { + // Convert final global RDD[String] to DataFrame + dataFrame = sqlContext.sparkSession.read.json(globalRDD) + ssc.stop(stopSparkContext = false, stopGracefully = false) +} + }) + + ssc.start + // run streaming until all docs from continuous feed are received + ssc.awaitTermination + // ssc.stop(stopSparkContext = false, stopGracefully = false) --- End diff -- Commented out code? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] bahir pull request #45: [WIP] [BAHIR-110] Implement _changes API for non-str...
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/45#discussion_r126435531 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala --- @@ -16,23 +16,20 @@ */ package org.apache.bahir.cloudant -// scalastyle:off -import scalaj.http._ - import play.api.libs.json.Json +import scalaj.http._ +import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver -import org.apache.spark.SparkConf import org.apache.bahir.cloudant.common._ -// scalastyle:on class CloudantReceiver(sparkConf: SparkConf, cloudantParams: Map[String, String]) extends Receiver[String](StorageLevel.MEMORY_AND_DISK) { - lazy val config: CloudantConfig = { + lazy val config: CloudantChangesConfig = { --- End diff -- Would this class be better named `CloudantChangesReceiver`? Since it doesn't appear to be a general case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] bahir pull request #45: [WIP] [BAHIR-110] Implement _changes API for non-str...
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/45#discussion_r126409663 --- Diff: sql-cloudant/README.md --- @@ -52,39 +51,71 @@ Here each subsequent configuration overrides the previous one. Thus, configurati ### Configuration in application.conf -Default values are defined in [here](cloudant-spark-sql/src/main/resources/application.conf). +Default values are defined in [here](src/main/resources/application.conf). ### Configuration on SparkConf Name | Default | Meaning --- |:---:| --- +cloudant.apiReceiver|"_all_docs"| API endpoint for RelationProvider when loading or saving data from Cloudant to DataFrames or SQL temporary tables. Select between "_all_docs" or "_changes" endpoint. cloudant.protocol|https|protocol to use to transfer data: http or https -cloudant.host||cloudant host url -cloudant.username||cloudant userid -cloudant.password||cloudant password +cloudant.host| |cloudant host url +cloudant.username| |cloudant userid +cloudant.password| |cloudant password cloudant.useQuery|false|By default, _all_docs endpoint is used if configuration 'view' and 'index' (see below) are not set. When useQuery is enabled, _find endpoint will be used in place of _all_docs when query condition is not on primary key field (_id), so that query predicates may be driven into datastore. cloudant.queryLimit|25|The maximum number of results returned when querying the _find endpoint. jsonstore.rdd.partitions|10|the number of partitions intent used to drive JsonStoreRDD loading query result in parallel. The actual number is calculated based on total rows returned and satisfying maxInPartition and minInPartition jsonstore.rdd.maxInPartition|-1|the max rows in a partition. -1 means unlimited jsonstore.rdd.minInPartition|10|the min rows in a partition. jsonstore.rdd.requestTimeout|90| the request timeout in milliseconds bulkSize|200| the bulk save size -schemaSampleSize| "-1" | the sample size for RDD schema discovery. 1 means we are using only first document for schema discovery; -1 means all documents; 0 will be treated as 1; any number N means min(N, total) docs -createDBOnSave|"false"| whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised. +schemaSampleSize|-1| the sample size for RDD schema discovery. 1 means we are using only first document for schema discovery; -1 means all documents; 0 will be treated as 1; any number N means min(N, total) docs +createDBOnSave|false| whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised. + +The `cloudant.apiReceiver` option allows for _changes or _all_docs API endpoint to be called while loading Cloudant data into Spark DataFrames or SQL Tables, +or saving data from DataFrames or SQL Tables to a Cloudant database. + +**Note:** When using `_changes` API, please consider: +1. Results are partially ordered and may not be be presented in order in +which documents were updated. +2. In case of shards' unavailability, you may see duplicate results (changes that have been seen already) +3. Can use `selector` option to filter Cloudant docs during load +4. Supports a real snapshot of the database and represents it in a single point of time. +5. Only supports single threaded + + +When using `_all_docs` API: +1. Supports parallel reads (using offset and range) +2. Using partitions may not represent the true snapshot of a database. Some docs + may be added or deleted in the database between loading data into different + Spark partitions. + +Performance of `_changes` API is still better in most cases (even with single threaded support). +During several performance tests using 200 MB to 15 GB Cloudant databases, load time from Cloudant to Spark using +`_changes` feed was faster to complete every time compared to `_all_docs`. + +See [CloudantChangesDFSuite](src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala) +for examples of loading data into a Spark DataFrame with `_changes` API. ### Configuration on Spark SQL Temporary Table or DataFrame Besides all the configurations passed to a temporary table or dataframe through SparkConf, it is also possible to set the following configurations in temporary table or dataframe using OPTIONS: Name | Default | Meaning --- |:---:| --- -database||cloudant database n
[GitHub] bahir pull request #45: [WIP] [BAHIR-110] Implement _changes API for non-str...
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/45#discussion_r126428690 --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala --- @@ -30,81 +28,83 @@ import org.apache.bahir.cloudant.common._ */ class CloudantConfig(val protocol: String, val host: String, -val dbName: String, val indexName: String = null, val viewName: String = null) +val dbName: String, val indexName: String, val viewName: String) (implicit val username: String, val password: String, val partitions: Int, val maxInPartition: Int, val minInPartition: Int, val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int, -val createDBOnSave: Boolean, val selector: String, val useQuery: Boolean = false, -val queryLimit: Int) -extends Serializable{ +val createDBOnSave: Boolean, val apiReceiver: String, +val useQuery: Boolean = false, val queryLimit: Int) +extends Serializable { - private lazy val dbUrl = {protocol + "://" + host + "/" + dbName} + lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName} val pkField = "_id" - val defaultIndex = "_all_docs" // "_changes" does not work for partition + val defaultIndex: String = apiReceiver val default_filter: String = "*:*" - def getContinuousChangesUrl(): String = { -var url = dbUrl + "/_changes?include_docs=true=continuous=3000" -if (selector != null) { - url = url + "=_selector" -} -url - } - - def getSelector() : String = { -selector - } - - def getDbUrl(): String = { + def getDbUrl: String = { dbUrl } - def getSchemaSampleSize(): Int = { + def getSchemaSampleSize: Int = { schemaSampleSize } - def getCreateDBonSave(): Boolean = { + def getCreateDBonSave: Boolean = { createDBOnSave } - def getTotalUrl(url: String): String = { -if (url.contains('?')) { - url + "=1" -} else { - url + "?limit=1" -} - } - - def getDbname(): String = { -dbName - } - - def queryEnabled(): Boolean = {useQuery && indexName==null && viewName==null} - - def allowPartition(queryUsed: Boolean): Boolean = {indexName==null && !queryUsed} - - def getAllDocsUrl(limit: Int, excludeDDoc: Boolean = false): String = { + def getLastNum(result: JsValue): JsValue = (result \ "last_seq").get + /* Url containing limit for docs in a Cloudant database. + * If a view is not defined, use the _all_docs endpoint. + * @return url with one doc limit for retrieving total doc count + */ + def getUrl(limit: Int, excludeDDoc: Boolean = false): String = { if (viewName == null) { - val baseUrl = ( - if ( excludeDDoc) dbUrl + "/_all_docs?startkey=%22_design0/%22_docs=true" - else dbUrl + "/_all_docs?include_docs=true" - ) - if (limit == JsonStoreConfigManager.ALL_DOCS_LIMIT) { + val baseUrl = { +if (excludeDDoc) { + dbUrl + "/_all_docs?startkey=%22_design0/%22_docs=true" --- End diff -- See https://github.com/cloudant/java-cloudant/issues/344#issuecomment-276938689 This `startkey` makes the assumption that no document IDs will start with upper case letters. Possibly one for another issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] bahir pull request #45: [WIP] [BAHIR-110] Implement _changes API for non-str...
Github user ricellis commented on a diff in the pull request: https://github.com/apache/bahir/pull/45#discussion_r126402723 --- Diff: sql-cloudant/README.md --- @@ -52,39 +51,71 @@ Here each subsequent configuration overrides the previous one. Thus, configurati ### Configuration in application.conf -Default values are defined in [here](cloudant-spark-sql/src/main/resources/application.conf). +Default values are defined in [here](src/main/resources/application.conf). ### Configuration on SparkConf Name | Default | Meaning --- |:---:| --- +cloudant.apiReceiver|"_all_docs"| API endpoint for RelationProvider when loading or saving data from Cloudant to DataFrames or SQL temporary tables. Select between "_all_docs" or "_changes" endpoint. cloudant.protocol|https|protocol to use to transfer data: http or https -cloudant.host||cloudant host url -cloudant.username||cloudant userid -cloudant.password||cloudant password +cloudant.host| |cloudant host url +cloudant.username| |cloudant userid +cloudant.password| |cloudant password cloudant.useQuery|false|By default, _all_docs endpoint is used if configuration 'view' and 'index' (see below) are not set. When useQuery is enabled, _find endpoint will be used in place of _all_docs when query condition is not on primary key field (_id), so that query predicates may be driven into datastore. cloudant.queryLimit|25|The maximum number of results returned when querying the _find endpoint. jsonstore.rdd.partitions|10|the number of partitions intent used to drive JsonStoreRDD loading query result in parallel. The actual number is calculated based on total rows returned and satisfying maxInPartition and minInPartition jsonstore.rdd.maxInPartition|-1|the max rows in a partition. -1 means unlimited jsonstore.rdd.minInPartition|10|the min rows in a partition. jsonstore.rdd.requestTimeout|90| the request timeout in milliseconds bulkSize|200| the bulk save size -schemaSampleSize| "-1" | the sample size for RDD schema discovery. 1 means we are using only first document for schema discovery; -1 means all documents; 0 will be treated as 1; any number N means min(N, total) docs -createDBOnSave|"false"| whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised. +schemaSampleSize|-1| the sample size for RDD schema discovery. 1 means we are using only first document for schema discovery; -1 means all documents; 0 will be treated as 1; any number N means min(N, total) docs --- End diff -- using only **the** first document --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---