IGNITE-3215 - Added IgniteRDD.withKeepBinary method
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b90d182c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b90d182c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b90d182c Branch: refs/heads/ignite-3215 Commit: b90d182ca2dc675bf7fd1550ffda648911ed82d6 Parents: 98a0990 Author: Valentin Kulichenko <valentin.luliche...@gmail.com> Authored: Tue Jun 14 18:09:10 2016 +0300 Committer: Valentin Kulichenko <valentin.luliche...@gmail.com> Committed: Tue Jun 14 18:09:10 2016 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/spark/IgniteContext.scala | 4 +-- .../org/apache/ignite/spark/IgniteRDD.scala | 19 ++++++++--- .../apache/ignite/spark/JavaIgniteContext.scala | 4 +-- .../org/apache/ignite/spark/JavaIgniteRDD.scala | 2 ++ .../ignite/spark/impl/IgniteAbstractRDD.scala | 15 ++++++--- .../apache/ignite/spark/impl/IgniteSqlRDD.scala | 5 +-- .../spark/impl/JavaIgniteAbstractRDD.scala | 34 -------------------- .../org/apache/ignite/spark/IgniteRDDSpec.scala | 25 ++++++++++++-- 8 files changed, 58 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b90d182c/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala index bd61974..1084dbe 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala @@ -107,7 +107,7 @@ class IgniteContext[K, V]( * @return `IgniteRDD` instance. */ def fromCache(cacheName: String): IgniteRDD[K, V] = { - new IgniteRDD[K, V](this, cacheName, null) + new IgniteRDD[K, V](this, cacheName, null, false) } /** @@ -118,7 +118,7 @@ class IgniteContext[K, V]( * @return `IgniteRDD` instance. */ def fromCache(cacheCfg: CacheConfiguration[K, V]) = { - new IgniteRDD[K, V](this, cacheCfg.getName, cacheCfg) + new IgniteRDD[K, V](this, cacheCfg.getName, cacheCfg, false) } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b90d182c/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala index fa96212..cad96b9 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala @@ -45,8 +45,9 @@ import scala.collection.JavaConversions._ class IgniteRDD[K, V] ( val ic: IgniteContext[K, V], val cacheName: String, - val cacheCfg: CacheConfiguration[K, V] -) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg) { + val cacheCfg: CacheConfiguration[K, V], + val keepBinary: Boolean +) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg, keepBinary) { /** * Computes iterator based on given partition. * @@ -127,7 +128,8 @@ class IgniteRDD[K, V] ( qry.setArgs(args.map(_.asInstanceOf[Object]):_*) - new IgniteSqlRDD[(K, V), Cache.Entry[K, V], K, V](ic, cacheName, cacheCfg, qry, entry â (entry.getKey, entry.getValue)) + new IgniteSqlRDD[(K, V), Cache.Entry[K, V], K, V](ic, cacheName, cacheCfg, qry, + entry â (entry.getKey, entry.getValue), keepBinary) } /** @@ -144,7 +146,8 @@ class IgniteRDD[K, V] ( val schema = buildSchema(ensureCache().query(qry).asInstanceOf[QueryCursorEx[java.util.List[_]]].fieldsMeta()) - val rowRdd = new IgniteSqlRDD[Row, java.util.List[_], K, V](ic, cacheName, cacheCfg, qry, list â Row.fromSeq(list)) + val rowRdd = new IgniteSqlRDD[Row, java.util.List[_], K, V]( + ic, cacheName, cacheCfg, qry, list â Row.fromSeq(list), keepBinary) ic.sqlContext.createDataFrame(rowRdd, schema) } @@ -290,6 +293,14 @@ class IgniteRDD[K, V] ( ensureCache().removeAll() } + def withKeepBinary[K1, V1](): IgniteRDD[K1, V1] = { + new IgniteRDD[K1, V1]( + ic.asInstanceOf[IgniteContext[K1, V1]], + cacheName, + cacheCfg.asInstanceOf[CacheConfiguration[K1, V1]], + true) + } + /** * Builds spark schema from query metadata. * http://git-wip-us.apache.org/repos/asf/ignite/blob/b90d182c/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala index 44b1cd9..25184e7 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala @@ -52,10 +52,10 @@ class JavaIgniteContext[K, V]( } def fromCache(cacheName: String): JavaIgniteRDD[K, V] = - JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheName, null)) + JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheName, null, false)) def fromCache(cacheCfg: CacheConfiguration[K, V]) = - JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheCfg.getName, cacheCfg)) + JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheCfg.getName, cacheCfg, false)) def ignite(): Ignite = ic.ignite() http://git-wip-us.apache.org/repos/asf/ignite/blob/b90d182c/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala index cac0e15..1efc6ae 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala @@ -96,6 +96,8 @@ class JavaIgniteRDD[K, V](override val rdd: IgniteRDD[K, V]) savePairs(jrdd, f, overwrite = false) def clear(): Unit = rdd.clear() + + def withKeepBinary[K1, V1](): JavaIgniteRDD[K1, V1] = new JavaIgniteRDD[K1, V1](rdd.withKeepBinary[K1, V1]()) } object JavaIgniteRDD { http://git-wip-us.apache.org/repos/asf/ignite/blob/b90d182c/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala index 25b3b56..9d5171c 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala @@ -27,13 +27,20 @@ import scala.reflect.ClassTag abstract class IgniteAbstractRDD[R:ClassTag, K, V] ( ic: IgniteContext[K, V], cacheName: String, - cacheCfg: CacheConfiguration[K, V] + cacheCfg: CacheConfiguration[K, V], + keepBinary: Boolean ) extends RDD[R] (ic.sparkContext, deps = Nil) { protected def ensureCache(): IgniteCache[K, V] = { // Make sure to deploy the cache - if (cacheCfg != null) - ic.ignite().getOrCreateCache(cacheCfg) + val cache = + if (cacheCfg != null) + ic.ignite().getOrCreateCache(cacheCfg) + else + ic.ignite().getOrCreateCache(cacheName) + + if (keepBinary) + cache.withKeepBinary() else - ic.ignite().getOrCreateCache(cacheName) + cache.asInstanceOf[IgniteCache[K, V]] } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b90d182c/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala index 762a6ed..b4579aa 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala @@ -29,8 +29,9 @@ class IgniteSqlRDD[R: ClassTag, T, K, V]( cacheName: String, cacheCfg: CacheConfiguration[K, V], qry: Query[T], - conv: (T) â R -) extends IgniteAbstractRDD[R, K, V](ic, cacheName, cacheCfg) { + conv: (T) â R, + keepBinary: Boolean +) extends IgniteAbstractRDD[R, K, V](ic, cacheName, cacheCfg, keepBinary) { override def compute(split: Partition, context: TaskContext): Iterator[R] = { new IgniteQueryIterator[T, R](ensureCache().query(qry).iterator(), conv) } http://git-wip-us.apache.org/repos/asf/ignite/blob/b90d182c/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala deleted file mode 100644 index 13bd3e8..0000000 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spark.impl - -import org.apache.ignite.IgniteCache -import org.apache.ignite.spark.IgniteRDD -import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike} - -abstract class JavaIgniteAbstractRDD[K, V](val rdd: IgniteRDD[K, V]) - extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] { - - protected def ensureCache(): IgniteCache[K, V] = { - // Make sure to deploy the cache - if (rdd.cacheCfg != null) - rdd.ic.ignite().getOrCreateCache(rdd.cacheCfg) - else - rdd.ic.ignite().getOrCreateCache(rdd.cacheName) - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/b90d182c/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala index 61040d9..8a5b355 100644 --- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala @@ -18,7 +18,7 @@ package org.apache.ignite.spark import org.apache.ignite.Ignition -import org.apache.ignite.cache.query.annotations.{QueryTextField, QuerySqlField} +import org.apache.ignite.cache.query.annotations.{QuerySqlField, QueryTextField} import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration} import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder @@ -26,9 +26,10 @@ import org.apache.spark.SparkContext import org.junit.runner.RunWith import org.scalatest._ import org.scalatest.junit.JUnitRunner -import scala.collection.JavaConversions._ +import scala.collection.JavaConversions._ import IgniteRDDSpec._ +import org.apache.ignite.binary.BinaryObject import scala.annotation.meta.field @@ -267,6 +268,26 @@ class IgniteRDDSpec extends FunSpec with Matchers with BeforeAndAfterAll with Be sc.stop() } } + + it("should properly work with binary objects") { + val sc = new SparkContext("local[*]", "test") + + try { + val ic = new IgniteContext[String, Entity](sc, () â configuration("client", client = true)) + + val cache = ic.fromCache(PARTITIONED_CACHE_NAME) + + cache.savePairs(sc.parallelize(0 until 10, 2).map(i â (String.valueOf(i), + new Entity(i, "name" + i, i * 100)))) + + val res = cache.withKeepBinary[String, BinaryObject]().map(t â t._2.field[Int]("salary")).collect() + + println(res) + } + finally { + sc.stop() + } + } } override protected def beforeEach() = {