so spark + jdbc thin client is the only way to go forward?

 Reading:-

val df = spark.read
  .format("jdbc")
  .option("url", "jdbc:ignite:thin://18.206.247.40")
  .option("fetchsize",100)
  //.option("driver", "org.apache.ignite.IgniteJdbcDriver")
  .option("dbtable", "Person2").load()

df.printSchema()

df.createOrReplaceTempView("test")

spark.sql("select * from test where id=1").show(10)

spark.sql("select 4,'blah',124232").show(10)


Writing:-

import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://18.206.247.40")

import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("url", "jdbc:ignite:thin://18.206.247.40")

spark.sql("select 4 as ID,'blah' as STREET,124232 as
ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://18.206.247.40",
  "Person2",connectionProperties)


On Fri, Oct 18, 2019 at 12:00 PM sri hari kali charan Tummala <
[email protected]> wrote:

> unfortunately, tables that are created by spark don't exist in ignite
> when I try to query using sqlline jdbc thin client, so the spark is still
> running locally the tables which are created by spark exists only for that
> session.
>
> did anyone come across this issue? how to resolve it?
>
>
>
>
>
> On Fri, Oct 18, 2019 at 11:32 AM sri hari kali charan Tummala <
> [email protected]> wrote:
>
>> Hi Stephen/All,
>>
>> got it working somewhat using below, but have an issue table and data
>> which is created using thin client is failing to read using spark but table
>> created by spark can be read using a thin client, that means table created
>> in Ignite using spark are the only ones read using spark in Ignite?
>>
>> example-default.xml
>>
>> <property name="addresses">
>>     <list>
>>         <!-- In distributed environment, replace with actual host IP 
>> address. -->
>>         <value>18.206.247.40:10800</value>
>>     </list>
>> </property>
>>
>>
>> NotWorking Scala Function:-
>>
>> def readThinClientTableUsingSpark(implicit spark: SparkSession) = {
>>   val personDataFrame = spark.read
>>     .format(FORMAT_IGNITE)
>>     .option(OPTION_CONFIG_FILE, CONFIG)
>>     .option(OPTION_TABLE, "person")
>>     .load()
>>
>>   println()
>>   println("Data frame thin connection content:person")
>>   println()
>>
>>   //Printing content of data frame to console.
>>   personDataFrame.show()
>> }
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *Exception in thread "main" class org.apache.ignite.IgniteException:
>> Unknown table person at
>> org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46)
>> at
>> org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46)
>> at scala.Option.getOrElse(Option.scala:121) at
>> org.apache.ignite.spark.impl.IgniteSQLRelation.schema(IgniteSQLRelation.scala:46)
>> at
>> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:431)
>> at
>> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
>> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at
>> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164) at
>> com.ignite.examples.spark.SparkIgniteCleanCode$.readThinClientTableUsingSpark(SparkIgniteCleanCode.scala:154)
>> at
>> com.ignite.examples.spark.SparkIgniteCleanCode$.main(SparkIgniteCleanCode.scala:216)
>> at
>> com.ignite.examples.spark.SparkIgniteCleanCode.main(SparkIgniteCleanCode.scala)*
>>
>> *Full Code:- (step 7 Fails)*
>>
>> package com.ignite.examples.spark
>>
>> import com.ignite.examples.model.Address
>> import org.apache.ignite.{Ignite, Ignition}
>> import org.apache.ignite.cache.query.SqlFieldsQuery
>> import org.apache.ignite.client.{ClientCache, IgniteClient}
>> import org.apache.ignite.configuration.{CacheConfiguration, 
>> ClientConfiguration}
>> import java.lang.{Long => JLong, String => JString}
>>
>> import com.ignite.examples.spark.SparkClientConnectionTest.{CACHE_NAME, 
>> CONFIG}
>> import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath
>> import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, 
>> OPTION_CONFIG_FILE, OPTION_CREATE_TABLE_PARAMETERS, 
>> OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, OPTION_TABLE}
>> import org.apache.log4j.{Level, Logger}
>> import org.apache.spark.sql.{SaveMode, SparkSession}
>>
>> object SparkIgniteCleanCode {
>>
>>   private val CACHE_NAME = "SparkCache"
>>
>>   private val CONFIG = 
>> "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
>>
>>   def setupExampleData = {
>>
>>     val cfg2 = new ClientConfiguration().setAddresses("18.206.247.40:10800")
>>     val igniteClient:IgniteClient = Ignition.startClient(cfg2)
>>
>>     System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
>>
>>     val cache:ClientCache[Integer, Address] = 
>> igniteClient.getOrCreateCache(CACHE_NAME)
>>
>>     cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS 
>> Person"))
>>       .setSchema("PUBLIC")).getAll
>>
>>     cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS 
>> Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH 
>> \"VALUE_TYPE=%s\"", classOf[Address].getName))
>>       .setSchema("PUBLIC")).getAll
>>
>>     cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) 
>> VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", 
>> "04074").setSchema("PUBLIC")).getAll
>>     cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) 
>> VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", 
>> "520003").setSchema("PUBLIC")).getAll
>>     cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) 
>> VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", 
>> "1234").setSchema("PUBLIC")).getAll
>>
>>     System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
>>
>>     val data=cache.query(new SqlFieldsQuery("select * from 
>> Person").setSchema("PUBLIC")).getAll
>>
>>     println(data.toString)
>>   }
>>
>>   def sparkReadIgniteWithThinClient(implicit spark: SparkSession)={
>>
>>     val df = spark.read
>>       .format("jdbc")
>>       .option("url", "jdbc:ignite:thin://18.206.247.40")
>>       .option("fetchsize",100)
>>       //.option("driver", "org.apache.ignite.IgniteJdbcDriver")
>>       .option("dbtable", "Person").load()
>>
>>     df.printSchema()
>>
>>     df.createOrReplaceTempView("test")
>>
>>     spark.sql("select * from test where id=1").show(10)
>>
>>     spark.sql("select 4,'blah',124232").show(10)
>>
>>   }
>>
>>   def sparkWriteIgniteWithThinClient(implicit spark: SparkSession)={
>>
>>     import java.sql.DriverManager
>>     val connection = 
>> DriverManager.getConnection("jdbc:ignite:thin://18.206.247.40")
>>
>>     import java.util.Properties
>>     val connectionProperties = new Properties()
>>
>>     connectionProperties.put("url", "jdbc:ignite:thin://18.206.247.40")
>>
>>     spark.sql("select 4 as ID,'blah' as STREET,124232 as 
>> ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://18.206.247.40",
>>       "Person",connectionProperties)
>>
>>     spark.read
>>       .format("jdbc")
>>       .option("url", "jdbc:ignite:thin://18.206.247.40")
>>       .option("fetchsize",100)
>>       .option("dbtable", "Person").load().show(10,false)
>>
>>   }
>>
>>   def writeJSonToIgniteUsingSpark(implicit spark: SparkSession): Unit = {
>>
>>     val ignite = Ignition.start(CONFIG)
>>
>>     //Load content of json file to data frame.
>>     val personsDataFrame = spark.read.json(
>>       
>> resolveIgnitePath("/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/resources/person.json").getAbsolutePath)
>>
>>     println()
>>     println("Json file content:")
>>     println()
>>
>>     //Printing content of json file to console.
>>     personsDataFrame.show()
>>
>>     println()
>>     println("Writing Data Frame to Ignite:")
>>     println()
>>
>>     //Writing content of data frame to Ignite.
>>     personsDataFrame.write
>>       .format(FORMAT_IGNITE)
>>       .mode(org.apache.spark.sql.SaveMode.Append)
>>       .option(OPTION_CONFIG_FILE, CONFIG)
>>       .option(OPTION_TABLE, "json_person")
>>       .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
>>       .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated")
>>       .save()
>>
>>     println("Done!")
>>
>>     println()
>>     println("Reading data from Ignite table:")
>>     println()
>>
>>     val cache = ignite.cache[Any, Any](CACHE_NAME)
>>
>>     //Reading saved data from Ignite.
>>     val data = cache.query(new SqlFieldsQuery("SELECT id, name, department 
>> FROM json_person")).getAll
>>
>>     println(data.toString)
>>
>>     //data.foreach { row ⇒ println(row.mkString("[", ", ", "]")) }
>>   }
>>
>>   def readIgniteUsingSpark(implicit spark: SparkSession) = {
>>     val json_person = spark.read
>>       .format(FORMAT_IGNITE)
>>       .option(OPTION_CONFIG_FILE, CONFIG)
>>       .option(OPTION_TABLE, "json_person")
>>       .load()
>>
>>     println()
>>     println("Data frame content:json_person")
>>     println()
>>
>>     //Printing content of data frame to console.
>>     json_person.show()
>>   }
>>
>>
>>   def readThinClientTableUsingSpark(implicit spark: SparkSession) = {
>>     val personDataFrame = spark.read
>>       .format(FORMAT_IGNITE)
>>       .option(OPTION_CONFIG_FILE, CONFIG)
>>       .option(OPTION_TABLE, "person")
>>       .load()
>>
>>     println()
>>     println("Data frame thin connection content:person")
>>     println()
>>
>>     //Printing content of data frame to console.
>>     personDataFrame.show()
>>   }
>>
>>
>>   def main(args: Array[String]): Unit = {
>>
>>     println()
>>     println("Step 1 setupExampleData:")
>>     println()
>>
>>     setupExampleData
>>
>>     println()
>>     println("Step 2 createSparkSession:")
>>     println()
>>
>>     //Creating spark session.
>>     implicit val spark = SparkSession.builder()
>>       .appName("Spark Ignite data sources example")
>>       .master("local")
>>       .config("spark.executor.instances", "2")
>>       .getOrCreate()
>>
>>     // Adjust the logger to exclude the logs of no interest.
>>     Logger.getRootLogger.setLevel(Level.ERROR)
>>     Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
>>
>>     println()
>>     println("Step 3 ReadIgniteWithThinClient of Step1 Data:")
>>     println()
>>
>>     sparkReadIgniteWithThinClient(spark)
>>
>>     println()
>>     println("Step 4 sparkWriteIgniteWithThinClient of Step1 Data:")
>>     println()
>>
>>     sparkWriteIgniteWithThinClient(spark)
>>
>>     println()
>>     println("Step 5 writeJSonToIgniteUsingSpark Using Spark:")
>>     println()
>>
>>     writeJSonToIgniteUsingSpark(spark)
>>
>>     println()
>>     println("Step 6 readIgniteUsingSpark Using Spark:")
>>     println()
>>
>>     readIgniteUsingSpark(spark)
>>
>>     println()
>>     println("Step 7 readThinClientTableUsingSpark Using Spark:")
>>     println()
>>
>>     readThinClientTableUsingSpark(spark)
>>
>>
>>     spark.stop()
>>
>>
>>   }
>>
>> }
>>
>>
>> example-default.xml
>>
>> <?xml version="1.0" encoding="UTF-8"?>
>>
>> <!--
>>   Licensed to the Apache Software Foundation (ASF) under one or more
>>   contributor license agreements.  See the NOTICE file distributed with
>>   this work for additional information regarding copyright ownership.
>>   The ASF licenses this file to You under the Apache License, Version 2.0
>>   (the "License"); you may not use this file except in compliance with
>>   the License.  You may obtain a copy of the License at
>>
>>        http://www.apache.org/licenses/LICENSE-2.0
>>
>>   Unless required by applicable law or agreed to in writing, software
>>   distributed under the License is distributed on an "AS IS" BASIS,
>>   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>   See the License for the specific language governing permissions and
>>   limitations under the License.
>> -->
>>
>> <!--
>>     Ignite configuration with all defaults and enabled p2p deployment and 
>> enabled events.
>> -->
>> <beans xmlns="http://www.springframework.org/schema/beans";
>>        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>        xmlns:util="http://www.springframework.org/schema/util";
>>        xsi:schemaLocation="
>>         http://www.springframework.org/schema/beans
>>         http://www.springframework.org/schema/beans/spring-beans.xsd
>>         http://www.springframework.org/schema/util
>>         http://www.springframework.org/schema/util/spring-util.xsd";>
>>     <bean abstract="true" id="ignite.cfg" 
>> class="org.apache.ignite.configuration.IgniteConfiguration">
>>         <!-- Set to true to enable distributed class loading for examples, 
>> default is false. -->
>>         <property name="peerClassLoadingEnabled" value="true"/>
>>
>>         <!-- Enable task execution events for examples. -->
>>         <property name="includeEventTypes">
>>             <list>
>>                 <!--Task execution events-->
>>                 <util:constant 
>> static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
>>                 <util:constant 
>> static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
>>                 <util:constant 
>> static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
>>                 <util:constant 
>> static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
>>                 <util:constant 
>> static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
>>                 <util:constant 
>> static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
>>
>>                 <!--Cache events-->
>>                 <util:constant 
>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
>>                 <util:constant 
>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
>>                 <util:constant 
>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
>>             </list>
>>         </property>
>>
>>         <!-- Explicitly configure TCP discovery SPI to provide list of 
>> initial nodes. -->
>>         <property name="discoverySpi">
>>             <bean 
>> class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
>>                 <property name="ipFinder">
>>                     <!--
>>                         Ignite provides several options for automatic 
>> discovery that can be used
>>                         instead os static IP based discovery. For 
>> information on all options refer
>>                         to our documentation: 
>> http://apacheignite.readme.io/docs/cluster-config
>>                     -->
>>                     <!-- Uncomment static IP finder to enable static-based 
>> discovery of initial nodes. -->
>>                     <!--<bean 
>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
>>                     <bean 
>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
>>                         <property name="addresses">
>>                             <list>
>>                                 <!-- In distributed environment, replace 
>> with actual host IP address. -->
>>                                 <value>18.206.247.40:10800</value>
>>                             </list>
>>                         </property>
>>                     </bean>
>>                 </property>
>>             </bean>
>>         </property>
>>     </bean>
>> </beans>
>>
>>
>> Thanks
>> Sri
>>
>> On Fri, Oct 18, 2019 at 9:23 AM sri hari kali charan Tummala <
>> [email protected]> wrote:
>>
>>> do you mean communication ports 47100-47200 as mentioned (
>>> https://www.gridgain.com/docs/8.7.6//installation-guide/manual-install-on-ec2)
>>> ? bare in mind I am running my spark job outside ignite ec2 box (MAC PC).
>>>
>>> which option is right in my default.xml file?
>>>
>>> Option 1:-
>>>
>>>                         <property name="addresses">
>>>                             <list>
>>>                                 <!-- In distributed environment, replace 
>>> with actual host IP address. -->
>>>                                 <value>3.88.248.113:4 
>>> <http://3.88.248.113:10800/>7100..47200</value>
>>>                             </list>
>>>                         </property>
>>>
>>>
>>> Option 2:-
>>>
>>>                         <property name="addresses">
>>>                             <list>
>>>                                 <!-- In distributed environment, replace 
>>> with actual host IP address. -->
>>>                                 <value>3.88.248.113:4 
>>> <http://3.88.248.113:10800/>7500..47600</value>
>>>                             </list>
>>>                         </property>
>>>
>>>
>>> Option 3:-
>>>
>>>                         <property name="addresses">
>>>                             <list>
>>>                                 <!-- In distributed environment, replace 
>>> with actual host IP address. -->
>>>                                 <value>3.88.248.113 
>>> <http://3.88.248.113:10800/></value>
>>>                             </list>
>>>                         </property>
>>>
>>>
>>>
>>> Thanks
>>>
>>> On Fri, Oct 18, 2019 at 9:18 AM sri hari kali charan Tummala <
>>> [email protected]> wrote:
>>>
>>>> Hi Stephen ,
>>>>
>>>>  do you mean 3.88.248.113: <http://3.88.248.113:10800/>47500..47700
>>>> something like this? or just public ip 3.88.248.113
>>>> <http://3.88.248.113:10800/> I tried all the possibilities none of
>>>> them are getting connected.
>>>>
>>>> Thanks
>>>> Sri
>>>>
>>>> On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington <
>>>> [email protected]> wrote:
>>>>
>>>>> You’re trying to connect a thick client (the Spark integration) to the
>>>>> thin client port (10800). Your example-default.xml file needs to have the
>>>>> same configuration as your server node(s).
>>>>>
>>>>> Regards,
>>>>> Stephen
>>>>>
>>>>> On 17 Oct 2019, at 18:12, sri hari kali charan Tummala <
>>>>> [email protected]> wrote:
>>>>>
>>>>> Hi Community,
>>>>>
>>>>> I am trying to read and write into the Ignite cluster using
>>>>> apache-spark I am able to do that using JDBC thin client but not native
>>>>> method as mentioned in several spark + ignite examples.
>>>>>
>>>>> Right now all the spark + ignite examples launch a local ignite
>>>>> cluster but I want my code connecting to already existing cluster 
>>>>> (client).
>>>>>
>>>>> Question:-
>>>>> *How to pass Ignite connection ip and port (10800)  10800 in
>>>>> example-default.xml ?*
>>>>>
>>>>> Error:-
>>>>> *TcpDiscoverySpi: Failed to connect to any address from IP finder
>>>>> (will retry to join topology every 2000 ms; change 'reconnectDelay' to
>>>>> configure the frequency of retries): [/3.88.248.113:10800
>>>>> <http://3.88.248.113:10800/>]*
>>>>>
>>>>> Working (Spark + Ignite using JDBC):-
>>>>>
>>>>> val df = spark.read
>>>>> .format("jdbc")
>>>>> .option("url", "jdbc:ignite:thin://3.88.248.113")
>>>>> .option("fetchsize",100)
>>>>> //.option("driver", "org.apache.ignite.IgniteJdbcDriver")
>>>>> .option("dbtable", "Person").load()
>>>>>
>>>>> df.printSchema()
>>>>>
>>>>> df.createOrReplaceTempView("test")
>>>>>
>>>>> spark.sql("select * from test where id=1").show(10)
>>>>>
>>>>> spark.sql("select 4,'blah',124232").show(10)
>>>>>
>>>>> import java.sql.DriverManager
>>>>> val connection = 
>>>>> DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
>>>>>
>>>>> import java.util.Properties
>>>>> val connectionProperties = new Properties()
>>>>>
>>>>> connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
>>>>>
>>>>> spark.sql("select 4 as ID,'blah' as STREET,124232 as 
>>>>> ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
>>>>>   "Person",connectionProperties)
>>>>>
>>>>> spark.read
>>>>>   .format("jdbc")
>>>>>   .option("url", "jdbc:ignite:thin://3.88.248.113")
>>>>>   .option("fetchsize",100)
>>>>>   .option("dbtable", "Person").load().show(10,false)
>>>>>
>>>>>
>>>>> Not Working requires a CONFIG file which is example-default.xml:-
>>>>>
>>>>> val igniteDF = spark.read
>>>>>   .format(FORMAT_IGNITE) //Data source type.
>>>>>   .option(OPTION_TABLE, "person") //Table to read.
>>>>>   .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
>>>>>   .load()
>>>>>   .filter(col("id") >= 2) //Filter clause.
>>>>>   .filter(col("name") like "%J%") //Another filter clause.
>>>>>
>>>>>
>>>>> Full Code:- (sparkDSLExample) function fails to connect ignite cluster
>>>>> which I already have
>>>>>
>>>>> package com.ignite.examples.spark
>>>>>
>>>>> import com.ignite.examples.model.Address
>>>>> import org.apache.ignite.{Ignite, Ignition}
>>>>> import org.apache.ignite.cache.query.SqlFieldsQuery
>>>>> import org.apache.ignite.client.{ClientCache, IgniteClient}
>>>>> import org.apache.ignite.configuration.{CacheConfiguration, 
>>>>> ClientConfiguration}
>>>>> import java.lang.{Long => JLong, String => JString}
>>>>>
>>>>> import org.apache.ignite.cache.query.SqlFieldsQuery
>>>>> import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, 
>>>>> OPTION_CONFIG_FILE, OPTION_TABLE}
>>>>> import org.apache.log4j.{Level, Logger}
>>>>> import org.apache.spark.sql.{SaveMode, SparkSession}
>>>>> import org.apache.spark.sql.functions.col
>>>>>
>>>>> object SparkClientConnectionTest {
>>>>>
>>>>>   private val CACHE_NAME = "SparkCache"
>>>>>
>>>>>   private val CONFIG = 
>>>>> "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
>>>>>
>>>>>   def setupExampleData = {
>>>>>
>>>>>     val cfg2 = new 
>>>>> ClientConfiguration().setAddresses("3.88.248.113:10800")
>>>>>     val igniteClient:IgniteClient = Ignition.startClient(cfg2)
>>>>>
>>>>>     System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
>>>>>
>>>>>     val cache:ClientCache[Integer, Address] = 
>>>>> igniteClient.getOrCreateCache(CACHE_NAME)
>>>>>
>>>>>     cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS 
>>>>> Person"))
>>>>>       .setSchema("PUBLIC")).getAll
>>>>>
>>>>>     cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT 
>>>>> EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) 
>>>>> WITH \"VALUE_TYPE=%s\"", classOf[Address].getName))
>>>>>       .setSchema("PUBLIC")).getAll
>>>>>
>>>>>     cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) 
>>>>> VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", 
>>>>> "04074").setSchema("PUBLIC")).getAll
>>>>>     cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) 
>>>>> VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", 
>>>>> "520003").setSchema("PUBLIC")).getAll
>>>>>     cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) 
>>>>> VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", 
>>>>> "1234").setSchema("PUBLIC")).getAll
>>>>>
>>>>>     System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
>>>>>
>>>>>     val data=cache.query(new SqlFieldsQuery("select * from 
>>>>> Person").setSchema("PUBLIC")).getAll
>>>>>
>>>>>     println(data.toString)
>>>>>   }
>>>>>
>>>>>   def sparkDSLExample(implicit spark: SparkSession): Unit = {
>>>>>     println("Querying using Spark DSL.")
>>>>>     println
>>>>>
>>>>>
>>>>>     val igniteDF = spark.read
>>>>>       .format(FORMAT_IGNITE) //Data source type.
>>>>>       .option(OPTION_TABLE, "person") //Table to read.
>>>>>       .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
>>>>>       .load()
>>>>>       .filter(col("id") >= 2) //Filter clause.
>>>>>       .filter(col("name") like "%J%") //Another filter clause.
>>>>>
>>>>>     println("Data frame schema:")
>>>>>
>>>>>     igniteDF.printSchema() //Printing query schema to console.
>>>>>
>>>>>     println("Data frame content:")
>>>>>
>>>>>     igniteDF.show() //Printing query results to console.
>>>>>   }
>>>>>
>>>>>
>>>>>   def main(args: Array[String]): Unit = {
>>>>>
>>>>>     setupExampleData
>>>>>
>>>>>     //Creating spark session.
>>>>>     implicit val spark = SparkSession.builder()
>>>>>       .appName("Spark Ignite data sources example")
>>>>>       .master("local")
>>>>>       .config("spark.executor.instances", "2")
>>>>>       .getOrCreate()
>>>>>
>>>>>     // Adjust the logger to exclude the logs of no interest.
>>>>>     Logger.getRootLogger.setLevel(Level.ERROR)
>>>>>     Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
>>>>>
>>>>>     //sparkDSLExample
>>>>>
>>>>>
>>>>>     val df = spark.read
>>>>>     .format("jdbc")
>>>>>     .option("url", "jdbc:ignite:thin://3.88.248.113")
>>>>>     .option("fetchsize",100)
>>>>>     //.option("driver", "org.apache.ignite.IgniteJdbcDriver")
>>>>>     .option("dbtable", "Person").load()
>>>>>
>>>>>     df.printSchema()
>>>>>
>>>>>     df.createOrReplaceTempView("test")
>>>>>
>>>>>     spark.sql("select * from test where id=1").show(10)
>>>>>
>>>>>     spark.sql("select 4,'blah',124232").show(10)
>>>>>
>>>>>     import java.sql.DriverManager
>>>>>     val connection = 
>>>>> DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
>>>>>
>>>>>     import java.util.Properties
>>>>>     val connectionProperties = new Properties()
>>>>>
>>>>>     connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
>>>>>
>>>>>     spark.sql("select 4 as ID,'blah' as STREET,124232 as 
>>>>> ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
>>>>>       "Person",connectionProperties)
>>>>>
>>>>>     spark.read
>>>>>       .format("jdbc")
>>>>>       .option("url", "jdbc:ignite:thin://3.88.248.113")
>>>>>       .option("fetchsize",100)
>>>>>       .option("dbtable", "Person").load().show(10,false)
>>>>>
>>>>>   }
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> example-default.xml:-
>>>>>
>>>>> <?xml version="1.0" encoding="UTF-8"?>
>>>>>
>>>>> <!--
>>>>>   Licensed to the Apache Software Foundation (ASF) under one or more
>>>>>   contributor license agreements.  See the NOTICE file distributed with
>>>>>   this work for additional information regarding copyright ownership.
>>>>>   The ASF licenses this file to You under the Apache License, Version 2.0
>>>>>   (the "License"); you may not use this file except in compliance with
>>>>>   the License.  You may obtain a copy of the License at
>>>>>
>>>>>        http://www.apache.org/licenses/LICENSE-2.0
>>>>>
>>>>>   Unless required by applicable law or agreed to in writing, software
>>>>>   distributed under the License is distributed on an "AS IS" BASIS,
>>>>>   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>>>>   See the License for the specific language governing permissions and
>>>>>   limitations under the License.
>>>>> -->
>>>>>
>>>>> <!--
>>>>>     Ignite configuration with all defaults and enabled p2p deployment and 
>>>>> enabled events.
>>>>> -->
>>>>> <beans xmlns="http://www.springframework.org/schema/beans";
>>>>>        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>>>>        xmlns:util="http://www.springframework.org/schema/util";
>>>>>        xsi:schemaLocation="
>>>>>         http://www.springframework.org/schema/beans
>>>>>         http://www.springframework.org/schema/beans/spring-beans.xsd
>>>>>         http://www.springframework.org/schema/util
>>>>>         http://www.springframework.org/schema/util/spring-util.xsd";>
>>>>>     <bean abstract="true" id="ignite.cfg" 
>>>>> class="org.apache.ignite.configuration.IgniteConfiguration">
>>>>>         <!-- Set to true to enable distributed class loading for 
>>>>> examples, default is false. -->
>>>>>         <property name="peerClassLoadingEnabled" value="true"/>
>>>>>
>>>>>         <!-- Enable task execution events for examples. -->
>>>>>         <property name="includeEventTypes">
>>>>>             <list>
>>>>>                 <!--Task execution events-->
>>>>>                 <util:constant 
>>>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
>>>>>                 <util:constant 
>>>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
>>>>>                 <util:constant 
>>>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
>>>>>                 <util:constant 
>>>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
>>>>>                 <util:constant 
>>>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
>>>>>                 <util:constant 
>>>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
>>>>>
>>>>>                 <!--Cache events-->
>>>>>                 <util:constant 
>>>>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
>>>>>                 <util:constant 
>>>>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
>>>>>                 <util:constant 
>>>>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
>>>>>             </list>
>>>>>         </property>
>>>>>
>>>>>         <!-- Explicitly configure TCP discovery SPI to provide list of 
>>>>> initial nodes. -->
>>>>>         <property name="discoverySpi">
>>>>>             <bean 
>>>>> class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
>>>>>                 <property name="ipFinder">
>>>>>                     <!--
>>>>>                         Ignite provides several options for automatic 
>>>>> discovery that can be used
>>>>>                         instead os static IP based discovery. For 
>>>>> information on all options refer
>>>>>                         to our documentation: 
>>>>> http://apacheignite.readme.io/docs/cluster-config
>>>>>                     -->
>>>>>                     <!-- Uncomment static IP finder to enable 
>>>>> static-based discovery of initial nodes. -->
>>>>>                     <!--<bean 
>>>>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
>>>>>                     <bean 
>>>>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
>>>>>                         <property name="addresses">
>>>>>                             <list>
>>>>>                                 <!-- In distributed environment, replace 
>>>>> with actual host IP address. -->
>>>>>                                 <value>3.88.248.113:10800</value>
>>>>>                             </list>
>>>>>                         </property>
>>>>>                     </bean>
>>>>>                 </property>
>>>>>             </bean>
>>>>>         </property>
>>>>>     </bean>
>>>>> </beans>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Thanks & Regards
>>>>> Sri Tummala
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>> Thanks & Regards
>>>> Sri Tummala
>>>>
>>>>
>>>
>>> --
>>> Thanks & Regards
>>> Sri Tummala
>>>
>>>
>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>
>
> --
> Thanks & Regards
> Sri Tummala
>
>

-- 
Thanks & Regards
Sri Tummala

Reply via email to