Github user mayya-sharipova commented on a diff in the pull request:
https://github.com/apache/bahir/pull/45#discussion_r127467315
--- Diff:
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala ---
@@ -98,29 +99,81 @@ class DefaultSource extends RelationProvider
val config: CloudantConfig =
JsonStoreConfigManager.getConfig(sqlContext, parameters)
- var allDocsDF: DataFrame = null
+ var dataFrame: DataFrame = null
val schema: StructType = {
if (inSchema != null) {
inSchema
- } else {
- val df = if (config.getSchemaSampleSize() ==
- JsonStoreConfigManager.ALL_DOCS_LIMIT &&
+ } else if (!config.isInstanceOf[CloudantChangesConfig]
+ || config.viewName != null || config.indexName != null) {
+ val df = if (config.getSchemaSampleSize ==
+ JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT &&
config.viewName == null
&& config.indexName == null) {
val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext,
config)
- allDocsDF = sqlContext.read.json(cloudantRDD)
- allDocsDF
+ dataFrame = sqlContext.read.json(cloudantRDD)
+ dataFrame
} else {
val dataAccess = new JsonStoreDataAccess(config)
val aRDD = sqlContext.sparkContext.parallelize(
- dataAccess.getMany(config.getSchemaSampleSize()))
+ dataAccess.getMany(config.getSchemaSampleSize))
sqlContext.read.json(aRDD)
}
df.schema
+ } else {
+ /* Create a streaming context to handle transforming docs in
+ * larger databases into Spark datasets
+ */
+ val ssc = new StreamingContext(sqlContext.sparkContext,
Seconds(10))
+ val streamingMap = {
+ val selector =
config.asInstanceOf[CloudantChangesConfig].getSelector
+ if (selector != null) {
+ Map(
+ "database" -> config.getDbname,
+ "selector" -> selector
+ )
+ } else {
+ Map(
+ "database" -> config.getDbname
+ )
+ }
+ }
+
+ val changes = ssc.receiverStream(
+ new CloudantReceiver(sqlContext.sparkContext.getConf,
streamingMap))
+ changes.persist(config.asInstanceOf[CloudantChangesConfig]
+ .getStorageLevelForStreaming)
+
+ // Global RDD that's created from union of all RDDs
+ var globalRDD = ssc.sparkContext.emptyRDD[String]
+
+ logger.info("Loading data from Cloudant using "
+ +
config.asInstanceOf[CloudantChangesConfig].getContinuousChangesUrl)
+
--- End diff --
@emlaver Here while trying to load a db, I am getting a message "Loading
data from Cloudant using
https://XXXX.cloudant.com/n_airportcodemapping/_changes?include_docs=true&feed=continuous&heartbeat=3000"
We should NOT load data into Spark SQL using `continuous` feed (which for
constantly updating database may be never be over). The whole point of loading
a db into Spark SQL is to load a snapshot of a db at a particular point of
time. Use `normal` feed here.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---