This is an automated email from the ASF dual-hosted git repository.

zaleslaw pushed a commit to branch ignite-11723
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 1fa9f2cd3e3d79442dd2f3e27ed8e86291a8b9d1
Author: Alexey Zinoviev <zaleslaw....@gmail.com>
AuthorDate: Mon Sep 30 16:53:52 2019 +0300

    IGNITE-11723: fixed property
---
 .../apache/ignite/spark/IgniteDataFrameSettings.scala | 19 +++++++++++++++++++
 .../scala/org/apache/ignite/spark/IgniteRDD.scala     | 11 +++++++----
 .../scala/org/apache/ignite/spark/JavaIgniteRDD.scala | 13 +++++++------
 .../ignite/spark/impl/IgniteRelationProvider.scala    |  3 +++
 .../org/apache/ignite/spark/impl/QueryHelper.scala    |  9 +++++++--
 .../ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java   |  6 +++---
 6 files changed, 46 insertions(+), 15 deletions(-)

diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
 
b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
index e176721..4e0abf4 100644
--- 
a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
+++ 
b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
@@ -120,6 +120,25 @@ object IgniteDataFrameSettings {
     /**
       * Config option for saving data frame.
       * Internally all SQL inserts are done through `IgniteDataStreamer`.
+      * This options sets `skipStore` property of streamer.
+      * If `true` then write-through behavior will be disabled for data 
streaming.
+      * If `false` then write-through behavior will be enabled for data 
streaming.
+      * Default value if `false`.
+      *
+      * @example {{{
+      *           val igniteDF = spark.write.format(IGNITE)
+      *               // other options ...
+      *               .option(OPTION_STREAMER_SKIP_STORE, true)
+      *               .save()
+      *          }}}
+      * @see [[org.apache.ignite.IgniteDataStreamer]]
+      * @see [[org.apache.ignite.IgniteDataStreamer#skipStore(boolean)]]
+      */
+    val OPTION_STREAMER_SKIP_STORE = "streamerSkipStore"
+
+    /**
+      * Config option for saving data frame.
+      * Internally all SQL inserts are done through `IgniteDataStreamer`.
       * This options sets `autoFlushFrequency` property of streamer.
       *
       * @example {{{
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala 
b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index 5fb81b6..0c38566 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -17,7 +17,6 @@
 package org.apache.ignite.spark
 
 import javax.cache.Cache
-
 import org.apache.ignite.cache.query._
 import org.apache.ignite.cluster.ClusterNode
 import org.apache.ignite.configuration.CacheConfiguration
@@ -228,8 +227,9 @@ class IgniteRDD[K, V] (
      * @param rdd RDD instance to save values from.
      * @param overwrite Boolean flag indicating whether the call on this 
method should overwrite existing
      *      values in Ignite cache.
+     * @param skipStore Sets flag indicating that write-through behavior 
should be disabled for data streaming.
      */
-    def savePairs(rdd: RDD[(K, V)], overwrite: Boolean = false) = {
+    def savePairs(rdd: RDD[(K, V)], overwrite: Boolean = false, skipStore: 
Boolean = false) = {
         rdd.foreachPartition(it ⇒ {
             val ig = ic.ignite()
 
@@ -240,6 +240,7 @@ class IgniteRDD[K, V] (
 
             try {
                 streamer.allowOverwrite(overwrite)
+                streamer.skipStore(skipStore)
 
                 it.foreach(tup ⇒ {
                     streamer.addData(tup._1, tup._2)
@@ -258,8 +259,9 @@ class IgniteRDD[K, V] (
      * @param f Transformation function.
      * @param overwrite Boolean flag indicating whether the call on this 
method should overwrite existing
      *      values in Ignite cache.
+      * @param skipStore Sets flag indicating that write-through behavior 
should be disabled for data streaming.
      */
-    def savePairs[T](rdd: RDD[T], f: (T, IgniteContext) ⇒ (K, V), overwrite: 
Boolean) = {
+    def savePairs[T](rdd: RDD[T], f: (T, IgniteContext) ⇒ (K, V), overwrite: 
Boolean, skipStore: Boolean) = {
         rdd.foreachPartition(it ⇒ {
             val ig = ic.ignite()
 
@@ -270,6 +272,7 @@ class IgniteRDD[K, V] (
 
             try {
                 streamer.allowOverwrite(overwrite)
+                streamer.skipStore(skipStore)
 
                 it.foreach(t ⇒ {
                     val tup = f(t, ic)
@@ -290,7 +293,7 @@ class IgniteRDD[K, V] (
      * @param f Transformation function.
      */
     def savePairs[T](rdd: RDD[T], f: (T, IgniteContext) ⇒ (K, V)): Unit = {
-        savePairs(rdd, f, overwrite = false)
+        savePairs(rdd, f, overwrite = false, skipStore = false)
     }
 
     /**
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala 
b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
index a44cb51..1937483 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
@@ -82,20 +82,21 @@ class JavaIgniteRDD[K, V](override val rdd: IgniteRDD[K, V])
 
     def saveValues[T](jrdd: JavaRDD[T], f: (T, IgniteContext) ⇒ V) = 
rdd.saveValues(JavaRDD.toRDD(jrdd), f)
 
-    def savePairs(jrdd: JavaPairRDD[K, V], overwrite: Boolean) = {
+    def savePairs(jrdd: JavaPairRDD[K, V], overwrite: Boolean, skipStore: 
Boolean) = {
         val rrdd: RDD[(K, V)] = JavaPairRDD.toRDD(jrdd)
 
-        rdd.savePairs(rrdd, overwrite)
+        rdd.savePairs(rrdd, overwrite, skipStore)
     }
 
-    def savePairs(jrdd: JavaPairRDD[K, V]) : Unit = savePairs(jrdd, overwrite 
= false)
+    def savePairs(jrdd: JavaPairRDD[K, V]): Unit = savePairs(jrdd, overwrite = 
false, skipStore = false)
 
-    def savePairs[T](jrdd: JavaRDD[T], f: (T, IgniteContext) ⇒ (K, V), 
overwrite: Boolean = false) = {
-        rdd.savePairs(JavaRDD.toRDD(jrdd), f, overwrite)
+    def savePairs[T](jrdd: JavaRDD[T], f: (T, IgniteContext) ⇒ (K, V), 
overwrite: Boolean = false,
+        skipStore: Boolean = false) = {
+        rdd.savePairs(JavaRDD.toRDD(jrdd), f, overwrite, skipStore)
     }
 
     def savePairs[T](jrdd: JavaRDD[T], f: (T, IgniteContext) ⇒ (K, V)): Unit =
-        savePairs(jrdd, f, overwrite = false)
+        savePairs(jrdd, f, overwrite = false, skipStore = false)
 
     def clear(): Unit = rdd.clear()
 
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
index 039ca63..a4f6da1 100644
--- 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
+++ 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
@@ -125,6 +125,7 @@ class IgniteRelationProvider extends RelationProvider
                         params.get(OPTION_SCHEMA),
                         ctx,
                         
params.get(OPTION_STREAMER_ALLOW_OVERWRITE).map(_.toBoolean),
+                        
params.get(OPTION_STREAMER_SKIP_STORE).map(_.toBoolean),
                         
params.get(OPTION_STREAMER_FLUSH_FREQUENCY).map(_.toLong),
                         
params.get(OPTION_STREAMER_PER_NODE_BUFFER_SIZE).map(_.toInt),
                         
params.get(OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS).map(_.toInt))
@@ -135,6 +136,7 @@ class IgniteRelationProvider extends RelationProvider
                         params.get(OPTION_SCHEMA),
                         ctx,
                         
params.get(OPTION_STREAMER_ALLOW_OVERWRITE).map(_.toBoolean),
+                        
params.get(OPTION_STREAMER_SKIP_STORE).map(_.toBoolean),
                         
params.get(OPTION_STREAMER_FLUSH_FREQUENCY).map(_.toLong),
                         
params.get(OPTION_STREAMER_PER_NODE_BUFFER_SIZE).map(_.toInt),
                         
params.get(OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS).map(_.toInt))
@@ -166,6 +168,7 @@ class IgniteRelationProvider extends RelationProvider
                 params.get(OPTION_SCHEMA),
                 ctx,
                 params.get(OPTION_STREAMER_ALLOW_OVERWRITE).map(_.toBoolean),
+                params.get(OPTION_STREAMER_SKIP_STORE).map(_.toBoolean),
                 params.get(OPTION_STREAMER_FLUSH_FREQUENCY).map(_.toLong),
                 params.get(OPTION_STREAMER_PER_NODE_BUFFER_SIZE).map(_.toInt),
                 
params.get(OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS).map(_.toInt))
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
index f752b1a..d123b01 100644
--- 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
+++ 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
@@ -18,12 +18,12 @@
 package org.apache.ignite.spark.impl
 
 import org.apache.ignite.cache.query.SqlFieldsQuery
-import org.apache.ignite.spark.IgniteDataFrameSettings._
-import QueryUtils.{compileCreateTable, compileDropTable, compileInsert}
 import org.apache.ignite.internal.IgniteEx
 import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl
 import org.apache.ignite.internal.processors.query.QueryUtils.DFLT_SCHEMA
 import org.apache.ignite.spark.IgniteContext
+import org.apache.ignite.spark.IgniteDataFrameSettings._
+import org.apache.ignite.spark.impl.QueryUtils.{compileCreateTable, 
compileDropTable, compileInsert}
 import org.apache.ignite.{Ignite, IgniteException}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{DataFrame, Row}
@@ -116,6 +116,7 @@ private[apache] object QueryHelper {
         schemaName: Option[String],
         ctx: IgniteContext,
         streamerAllowOverwrite: Option[Boolean],
+        streamerSkipStore: Option[Boolean],
         streamerFlushFrequency: Option[Long],
         streamerPerNodeBufferSize: Option[Int],
         streamerPerNodeParallelOperations: Option[Int]
@@ -129,6 +130,7 @@ private[apache] object QueryHelper {
                 schemaName,
                 ctx,
                 streamerAllowOverwrite,
+                streamerSkipStore,
                 streamerFlushFrequency,
                 streamerPerNodeBufferSize,
                 streamerPerNodeParallelOperations
@@ -160,6 +162,7 @@ private[apache] object QueryHelper {
         schemaName: Option[String],
         ctx: IgniteContext,
         streamerAllowOverwrite: Option[Boolean],
+        streamerSkipStore: Option[Boolean],
         streamerFlushFrequency: Option[Long],
         streamerPerNodeBufferSize: Option[Int],
         streamerPerNodeParallelOperations: Option[Int]
@@ -170,6 +173,8 @@ private[apache] object QueryHelper {
 
         streamerAllowOverwrite.foreach(v ⇒ streamer.allowOverwrite(v))
 
+        streamerSkipStore.foreach(v ⇒ streamer.skipStore(v))
+
         streamerFlushFrequency.foreach(v ⇒ streamer.autoFlushFrequency(v))
 
         streamerPerNodeBufferSize.foreach(v ⇒ streamer.perNodeBufferSize(v))
diff --git 
a/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
 
b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
index bf256c6..fbd4363 100644
--- 
a/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
+++ 
b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
@@ -129,7 +129,7 @@ public class JavaEmbeddedIgniteRDDSelfTest extends 
GridCommonAbstractTest {
             ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider(), 
false);
 
             ic.fromCache(PARTITIONED_CACHE_NAME)
-                .savePairs(sc.parallelize(F.range(0, KEYS_CNT), 
GRID_CNT).mapToPair(TO_PAIR_F), true);
+                .savePairs(sc.parallelize(F.range(0, KEYS_CNT), 
GRID_CNT).mapToPair(TO_PAIR_F), true, false);
 
             Ignite ignite = ic.ignite();
 
@@ -200,7 +200,7 @@ public class JavaEmbeddedIgniteRDDSelfTest extends 
GridCommonAbstractTest {
             JavaIgniteRDD<String, Entity> cache = 
ic.fromCache(PARTITIONED_CACHE_NAME);
 
             int cnt = 1001;
-            cache.savePairs(sc.parallelize(F.range(0, cnt), 
GRID_CNT).mapToPair(INT_TO_ENTITY_F), true);
+            cache.savePairs(sc.parallelize(F.range(0, cnt), 
GRID_CNT).mapToPair(INT_TO_ENTITY_F), true, false);
 
             List<Entity> res = cache.objectSql("Entity", "name = ? and salary 
= ?", "name50", 5000)
                 .map(STR_ENTITY_PAIR_TO_ENTITY_F).collect();
@@ -238,7 +238,7 @@ public class JavaEmbeddedIgniteRDDSelfTest extends 
GridCommonAbstractTest {
 
             JavaIgniteRDD<String, Entity> cache = 
ic.fromCache(PARTITIONED_CACHE_NAME);
 
-            cache.savePairs(sc.parallelize(F.range(0, 1001), 
GRID_CNT).mapToPair(INT_TO_ENTITY_F), true);
+            cache.savePairs(sc.parallelize(F.range(0, 1001), 
GRID_CNT).mapToPair(INT_TO_ENTITY_F), true, false);
 
             Dataset<Row> df =
                 cache.sql("select id, name, salary from Entity where name = ? 
and salary = ?", "name50", 5000);

Reply via email to