so spark + jdbc thin client is the only way to go forward?
Reading:-
val df = spark.read
.format("jdbc")
.option("url", "jdbc:ignite:thin://18.206.247.40")
.option("fetchsize",100)
//.option("driver", "org.apache.ignite.IgniteJdbcDriver")
.option("dbtable", "Person2").load()
df.printSchema()
df.createOrReplaceTempView("test")
spark.sql("select * from test where id=1").show(10)
spark.sql("select 4,'blah',124232").show(10)
Writing:-
import java.sql.DriverManager
val connection = DriverManager.getConnection("jdbc:ignite:thin://18.206.247.40")
import java.util.Properties
val connectionProperties = new Properties()
connectionProperties.put("url", "jdbc:ignite:thin://18.206.247.40")
spark.sql("select 4 as ID,'blah' as STREET,124232 as
ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://18.206.247.40",
"Person2",connectionProperties)
On Fri, Oct 18, 2019 at 12:00 PM sri hari kali charan Tummala <
[email protected]> wrote:
> unfortunately, tables that are created by spark don't exist in ignite
> when I try to query using sqlline jdbc thin client, so the spark is still
> running locally the tables which are created by spark exists only for that
> session.
>
> did anyone come across this issue? how to resolve it?
>
>
>
>
>
> On Fri, Oct 18, 2019 at 11:32 AM sri hari kali charan Tummala <
> [email protected]> wrote:
>
>> Hi Stephen/All,
>>
>> got it working somewhat using below, but have an issue table and data
>> which is created using thin client is failing to read using spark but table
>> created by spark can be read using a thin client, that means table created
>> in Ignite using spark are the only ones read using spark in Ignite?
>>
>> example-default.xml
>>
>> <property name="addresses">
>> <list>
>> <!-- In distributed environment, replace with actual host IP
>> address. -->
>> <value>18.206.247.40:10800</value>
>> </list>
>> </property>
>>
>>
>> NotWorking Scala Function:-
>>
>> def readThinClientTableUsingSpark(implicit spark: SparkSession) = {
>> val personDataFrame = spark.read
>> .format(FORMAT_IGNITE)
>> .option(OPTION_CONFIG_FILE, CONFIG)
>> .option(OPTION_TABLE, "person")
>> .load()
>>
>> println()
>> println("Data frame thin connection content:person")
>> println()
>>
>> //Printing content of data frame to console.
>> personDataFrame.show()
>> }
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *Exception in thread "main" class org.apache.ignite.IgniteException:
>> Unknown table person at
>> org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46)
>> at
>> org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46)
>> at scala.Option.getOrElse(Option.scala:121) at
>> org.apache.ignite.spark.impl.IgniteSQLRelation.schema(IgniteSQLRelation.scala:46)
>> at
>> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:431)
>> at
>> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
>> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at
>> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164) at
>> com.ignite.examples.spark.SparkIgniteCleanCode$.readThinClientTableUsingSpark(SparkIgniteCleanCode.scala:154)
>> at
>> com.ignite.examples.spark.SparkIgniteCleanCode$.main(SparkIgniteCleanCode.scala:216)
>> at
>> com.ignite.examples.spark.SparkIgniteCleanCode.main(SparkIgniteCleanCode.scala)*
>>
>> *Full Code:- (step 7 Fails)*
>>
>> package com.ignite.examples.spark
>>
>> import com.ignite.examples.model.Address
>> import org.apache.ignite.{Ignite, Ignition}
>> import org.apache.ignite.cache.query.SqlFieldsQuery
>> import org.apache.ignite.client.{ClientCache, IgniteClient}
>> import org.apache.ignite.configuration.{CacheConfiguration,
>> ClientConfiguration}
>> import java.lang.{Long => JLong, String => JString}
>>
>> import com.ignite.examples.spark.SparkClientConnectionTest.{CACHE_NAME,
>> CONFIG}
>> import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath
>> import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE,
>> OPTION_CONFIG_FILE, OPTION_CREATE_TABLE_PARAMETERS,
>> OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, OPTION_TABLE}
>> import org.apache.log4j.{Level, Logger}
>> import org.apache.spark.sql.{SaveMode, SparkSession}
>>
>> object SparkIgniteCleanCode {
>>
>> private val CACHE_NAME = "SparkCache"
>>
>> private val CONFIG =
>> "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
>>
>> def setupExampleData = {
>>
>> val cfg2 = new ClientConfiguration().setAddresses("18.206.247.40:10800")
>> val igniteClient:IgniteClient = Ignition.startClient(cfg2)
>>
>> System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
>>
>> val cache:ClientCache[Integer, Address] =
>> igniteClient.getOrCreateCache(CACHE_NAME)
>>
>> cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS
>> Person"))
>> .setSchema("PUBLIC")).getAll
>>
>> cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS
>> Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH
>> \"VALUE_TYPE=%s\"", classOf[Address].getName))
>> .setSchema("PUBLIC")).getAll
>>
>> cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip)
>> VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco",
>> "04074").setSchema("PUBLIC")).getAll
>> cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip)
>> VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road",
>> "520003").setSchema("PUBLIC")).getAll
>> cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip)
>> VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road",
>> "1234").setSchema("PUBLIC")).getAll
>>
>> System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
>>
>> val data=cache.query(new SqlFieldsQuery("select * from
>> Person").setSchema("PUBLIC")).getAll
>>
>> println(data.toString)
>> }
>>
>> def sparkReadIgniteWithThinClient(implicit spark: SparkSession)={
>>
>> val df = spark.read
>> .format("jdbc")
>> .option("url", "jdbc:ignite:thin://18.206.247.40")
>> .option("fetchsize",100)
>> //.option("driver", "org.apache.ignite.IgniteJdbcDriver")
>> .option("dbtable", "Person").load()
>>
>> df.printSchema()
>>
>> df.createOrReplaceTempView("test")
>>
>> spark.sql("select * from test where id=1").show(10)
>>
>> spark.sql("select 4,'blah',124232").show(10)
>>
>> }
>>
>> def sparkWriteIgniteWithThinClient(implicit spark: SparkSession)={
>>
>> import java.sql.DriverManager
>> val connection =
>> DriverManager.getConnection("jdbc:ignite:thin://18.206.247.40")
>>
>> import java.util.Properties
>> val connectionProperties = new Properties()
>>
>> connectionProperties.put("url", "jdbc:ignite:thin://18.206.247.40")
>>
>> spark.sql("select 4 as ID,'blah' as STREET,124232 as
>> ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://18.206.247.40",
>> "Person",connectionProperties)
>>
>> spark.read
>> .format("jdbc")
>> .option("url", "jdbc:ignite:thin://18.206.247.40")
>> .option("fetchsize",100)
>> .option("dbtable", "Person").load().show(10,false)
>>
>> }
>>
>> def writeJSonToIgniteUsingSpark(implicit spark: SparkSession): Unit = {
>>
>> val ignite = Ignition.start(CONFIG)
>>
>> //Load content of json file to data frame.
>> val personsDataFrame = spark.read.json(
>>
>> resolveIgnitePath("/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/resources/person.json").getAbsolutePath)
>>
>> println()
>> println("Json file content:")
>> println()
>>
>> //Printing content of json file to console.
>> personsDataFrame.show()
>>
>> println()
>> println("Writing Data Frame to Ignite:")
>> println()
>>
>> //Writing content of data frame to Ignite.
>> personsDataFrame.write
>> .format(FORMAT_IGNITE)
>> .mode(org.apache.spark.sql.SaveMode.Append)
>> .option(OPTION_CONFIG_FILE, CONFIG)
>> .option(OPTION_TABLE, "json_person")
>> .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
>> .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated")
>> .save()
>>
>> println("Done!")
>>
>> println()
>> println("Reading data from Ignite table:")
>> println()
>>
>> val cache = ignite.cache[Any, Any](CACHE_NAME)
>>
>> //Reading saved data from Ignite.
>> val data = cache.query(new SqlFieldsQuery("SELECT id, name, department
>> FROM json_person")).getAll
>>
>> println(data.toString)
>>
>> //data.foreach { row ⇒ println(row.mkString("[", ", ", "]")) }
>> }
>>
>> def readIgniteUsingSpark(implicit spark: SparkSession) = {
>> val json_person = spark.read
>> .format(FORMAT_IGNITE)
>> .option(OPTION_CONFIG_FILE, CONFIG)
>> .option(OPTION_TABLE, "json_person")
>> .load()
>>
>> println()
>> println("Data frame content:json_person")
>> println()
>>
>> //Printing content of data frame to console.
>> json_person.show()
>> }
>>
>>
>> def readThinClientTableUsingSpark(implicit spark: SparkSession) = {
>> val personDataFrame = spark.read
>> .format(FORMAT_IGNITE)
>> .option(OPTION_CONFIG_FILE, CONFIG)
>> .option(OPTION_TABLE, "person")
>> .load()
>>
>> println()
>> println("Data frame thin connection content:person")
>> println()
>>
>> //Printing content of data frame to console.
>> personDataFrame.show()
>> }
>>
>>
>> def main(args: Array[String]): Unit = {
>>
>> println()
>> println("Step 1 setupExampleData:")
>> println()
>>
>> setupExampleData
>>
>> println()
>> println("Step 2 createSparkSession:")
>> println()
>>
>> //Creating spark session.
>> implicit val spark = SparkSession.builder()
>> .appName("Spark Ignite data sources example")
>> .master("local")
>> .config("spark.executor.instances", "2")
>> .getOrCreate()
>>
>> // Adjust the logger to exclude the logs of no interest.
>> Logger.getRootLogger.setLevel(Level.ERROR)
>> Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
>>
>> println()
>> println("Step 3 ReadIgniteWithThinClient of Step1 Data:")
>> println()
>>
>> sparkReadIgniteWithThinClient(spark)
>>
>> println()
>> println("Step 4 sparkWriteIgniteWithThinClient of Step1 Data:")
>> println()
>>
>> sparkWriteIgniteWithThinClient(spark)
>>
>> println()
>> println("Step 5 writeJSonToIgniteUsingSpark Using Spark:")
>> println()
>>
>> writeJSonToIgniteUsingSpark(spark)
>>
>> println()
>> println("Step 6 readIgniteUsingSpark Using Spark:")
>> println()
>>
>> readIgniteUsingSpark(spark)
>>
>> println()
>> println("Step 7 readThinClientTableUsingSpark Using Spark:")
>> println()
>>
>> readThinClientTableUsingSpark(spark)
>>
>>
>> spark.stop()
>>
>>
>> }
>>
>> }
>>
>>
>> example-default.xml
>>
>> <?xml version="1.0" encoding="UTF-8"?>
>>
>> <!--
>> Licensed to the Apache Software Foundation (ASF) under one or more
>> contributor license agreements. See the NOTICE file distributed with
>> this work for additional information regarding copyright ownership.
>> The ASF licenses this file to You under the Apache License, Version 2.0
>> (the "License"); you may not use this file except in compliance with
>> the License. You may obtain a copy of the License at
>>
>> http://www.apache.org/licenses/LICENSE-2.0
>>
>> Unless required by applicable law or agreed to in writing, software
>> distributed under the License is distributed on an "AS IS" BASIS,
>> WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> See the License for the specific language governing permissions and
>> limitations under the License.
>> -->
>>
>> <!--
>> Ignite configuration with all defaults and enabled p2p deployment and
>> enabled events.
>> -->
>> <beans xmlns="http://www.springframework.org/schema/beans"
>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>> xmlns:util="http://www.springframework.org/schema/util"
>> xsi:schemaLocation="
>> http://www.springframework.org/schema/beans
>> http://www.springframework.org/schema/beans/spring-beans.xsd
>> http://www.springframework.org/schema/util
>> http://www.springframework.org/schema/util/spring-util.xsd">
>> <bean abstract="true" id="ignite.cfg"
>> class="org.apache.ignite.configuration.IgniteConfiguration">
>> <!-- Set to true to enable distributed class loading for examples,
>> default is false. -->
>> <property name="peerClassLoadingEnabled" value="true"/>
>>
>> <!-- Enable task execution events for examples. -->
>> <property name="includeEventTypes">
>> <list>
>> <!--Task execution events-->
>> <util:constant
>> static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
>> <util:constant
>> static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
>> <util:constant
>> static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
>> <util:constant
>> static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
>> <util:constant
>> static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
>> <util:constant
>> static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
>>
>> <!--Cache events-->
>> <util:constant
>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
>> <util:constant
>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
>> <util:constant
>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
>> </list>
>> </property>
>>
>> <!-- Explicitly configure TCP discovery SPI to provide list of
>> initial nodes. -->
>> <property name="discoverySpi">
>> <bean
>> class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
>> <property name="ipFinder">
>> <!--
>> Ignite provides several options for automatic
>> discovery that can be used
>> instead os static IP based discovery. For
>> information on all options refer
>> to our documentation:
>> http://apacheignite.readme.io/docs/cluster-config
>> -->
>> <!-- Uncomment static IP finder to enable static-based
>> discovery of initial nodes. -->
>> <!--<bean
>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
>> <bean
>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
>> <property name="addresses">
>> <list>
>> <!-- In distributed environment, replace
>> with actual host IP address. -->
>> <value>18.206.247.40:10800</value>
>> </list>
>> </property>
>> </bean>
>> </property>
>> </bean>
>> </property>
>> </bean>
>> </beans>
>>
>>
>> Thanks
>> Sri
>>
>> On Fri, Oct 18, 2019 at 9:23 AM sri hari kali charan Tummala <
>> [email protected]> wrote:
>>
>>> do you mean communication ports 47100-47200 as mentioned (
>>> https://www.gridgain.com/docs/8.7.6//installation-guide/manual-install-on-ec2)
>>> ? bare in mind I am running my spark job outside ignite ec2 box (MAC PC).
>>>
>>> which option is right in my default.xml file?
>>>
>>> Option 1:-
>>>
>>> <property name="addresses">
>>> <list>
>>> <!-- In distributed environment, replace
>>> with actual host IP address. -->
>>> <value>3.88.248.113:4
>>> <http://3.88.248.113:10800/>7100..47200</value>
>>> </list>
>>> </property>
>>>
>>>
>>> Option 2:-
>>>
>>> <property name="addresses">
>>> <list>
>>> <!-- In distributed environment, replace
>>> with actual host IP address. -->
>>> <value>3.88.248.113:4
>>> <http://3.88.248.113:10800/>7500..47600</value>
>>> </list>
>>> </property>
>>>
>>>
>>> Option 3:-
>>>
>>> <property name="addresses">
>>> <list>
>>> <!-- In distributed environment, replace
>>> with actual host IP address. -->
>>> <value>3.88.248.113
>>> <http://3.88.248.113:10800/></value>
>>> </list>
>>> </property>
>>>
>>>
>>>
>>> Thanks
>>>
>>> On Fri, Oct 18, 2019 at 9:18 AM sri hari kali charan Tummala <
>>> [email protected]> wrote:
>>>
>>>> Hi Stephen ,
>>>>
>>>> do you mean 3.88.248.113: <http://3.88.248.113:10800/>47500..47700
>>>> something like this? or just public ip 3.88.248.113
>>>> <http://3.88.248.113:10800/> I tried all the possibilities none of
>>>> them are getting connected.
>>>>
>>>> Thanks
>>>> Sri
>>>>
>>>> On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington <
>>>> [email protected]> wrote:
>>>>
>>>>> You’re trying to connect a thick client (the Spark integration) to the
>>>>> thin client port (10800). Your example-default.xml file needs to have the
>>>>> same configuration as your server node(s).
>>>>>
>>>>> Regards,
>>>>> Stephen
>>>>>
>>>>> On 17 Oct 2019, at 18:12, sri hari kali charan Tummala <
>>>>> [email protected]> wrote:
>>>>>
>>>>> Hi Community,
>>>>>
>>>>> I am trying to read and write into the Ignite cluster using
>>>>> apache-spark I am able to do that using JDBC thin client but not native
>>>>> method as mentioned in several spark + ignite examples.
>>>>>
>>>>> Right now all the spark + ignite examples launch a local ignite
>>>>> cluster but I want my code connecting to already existing cluster
>>>>> (client).
>>>>>
>>>>> Question:-
>>>>> *How to pass Ignite connection ip and port (10800) 10800 in
>>>>> example-default.xml ?*
>>>>>
>>>>> Error:-
>>>>> *TcpDiscoverySpi: Failed to connect to any address from IP finder
>>>>> (will retry to join topology every 2000 ms; change 'reconnectDelay' to
>>>>> configure the frequency of retries): [/3.88.248.113:10800
>>>>> <http://3.88.248.113:10800/>]*
>>>>>
>>>>> Working (Spark + Ignite using JDBC):-
>>>>>
>>>>> val df = spark.read
>>>>> .format("jdbc")
>>>>> .option("url", "jdbc:ignite:thin://3.88.248.113")
>>>>> .option("fetchsize",100)
>>>>> //.option("driver", "org.apache.ignite.IgniteJdbcDriver")
>>>>> .option("dbtable", "Person").load()
>>>>>
>>>>> df.printSchema()
>>>>>
>>>>> df.createOrReplaceTempView("test")
>>>>>
>>>>> spark.sql("select * from test where id=1").show(10)
>>>>>
>>>>> spark.sql("select 4,'blah',124232").show(10)
>>>>>
>>>>> import java.sql.DriverManager
>>>>> val connection =
>>>>> DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
>>>>>
>>>>> import java.util.Properties
>>>>> val connectionProperties = new Properties()
>>>>>
>>>>> connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
>>>>>
>>>>> spark.sql("select 4 as ID,'blah' as STREET,124232 as
>>>>> ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
>>>>> "Person",connectionProperties)
>>>>>
>>>>> spark.read
>>>>> .format("jdbc")
>>>>> .option("url", "jdbc:ignite:thin://3.88.248.113")
>>>>> .option("fetchsize",100)
>>>>> .option("dbtable", "Person").load().show(10,false)
>>>>>
>>>>>
>>>>> Not Working requires a CONFIG file which is example-default.xml:-
>>>>>
>>>>> val igniteDF = spark.read
>>>>> .format(FORMAT_IGNITE) //Data source type.
>>>>> .option(OPTION_TABLE, "person") //Table to read.
>>>>> .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
>>>>> .load()
>>>>> .filter(col("id") >= 2) //Filter clause.
>>>>> .filter(col("name") like "%J%") //Another filter clause.
>>>>>
>>>>>
>>>>> Full Code:- (sparkDSLExample) function fails to connect ignite cluster
>>>>> which I already have
>>>>>
>>>>> package com.ignite.examples.spark
>>>>>
>>>>> import com.ignite.examples.model.Address
>>>>> import org.apache.ignite.{Ignite, Ignition}
>>>>> import org.apache.ignite.cache.query.SqlFieldsQuery
>>>>> import org.apache.ignite.client.{ClientCache, IgniteClient}
>>>>> import org.apache.ignite.configuration.{CacheConfiguration,
>>>>> ClientConfiguration}
>>>>> import java.lang.{Long => JLong, String => JString}
>>>>>
>>>>> import org.apache.ignite.cache.query.SqlFieldsQuery
>>>>> import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE,
>>>>> OPTION_CONFIG_FILE, OPTION_TABLE}
>>>>> import org.apache.log4j.{Level, Logger}
>>>>> import org.apache.spark.sql.{SaveMode, SparkSession}
>>>>> import org.apache.spark.sql.functions.col
>>>>>
>>>>> object SparkClientConnectionTest {
>>>>>
>>>>> private val CACHE_NAME = "SparkCache"
>>>>>
>>>>> private val CONFIG =
>>>>> "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
>>>>>
>>>>> def setupExampleData = {
>>>>>
>>>>> val cfg2 = new
>>>>> ClientConfiguration().setAddresses("3.88.248.113:10800")
>>>>> val igniteClient:IgniteClient = Ignition.startClient(cfg2)
>>>>>
>>>>> System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
>>>>>
>>>>> val cache:ClientCache[Integer, Address] =
>>>>> igniteClient.getOrCreateCache(CACHE_NAME)
>>>>>
>>>>> cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS
>>>>> Person"))
>>>>> .setSchema("PUBLIC")).getAll
>>>>>
>>>>> cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT
>>>>> EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) )
>>>>> WITH \"VALUE_TYPE=%s\"", classOf[Address].getName))
>>>>> .setSchema("PUBLIC")).getAll
>>>>>
>>>>> cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip)
>>>>> VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco",
>>>>> "04074").setSchema("PUBLIC")).getAll
>>>>> cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip)
>>>>> VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road",
>>>>> "520003").setSchema("PUBLIC")).getAll
>>>>> cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip)
>>>>> VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road",
>>>>> "1234").setSchema("PUBLIC")).getAll
>>>>>
>>>>> System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
>>>>>
>>>>> val data=cache.query(new SqlFieldsQuery("select * from
>>>>> Person").setSchema("PUBLIC")).getAll
>>>>>
>>>>> println(data.toString)
>>>>> }
>>>>>
>>>>> def sparkDSLExample(implicit spark: SparkSession): Unit = {
>>>>> println("Querying using Spark DSL.")
>>>>> println
>>>>>
>>>>>
>>>>> val igniteDF = spark.read
>>>>> .format(FORMAT_IGNITE) //Data source type.
>>>>> .option(OPTION_TABLE, "person") //Table to read.
>>>>> .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
>>>>> .load()
>>>>> .filter(col("id") >= 2) //Filter clause.
>>>>> .filter(col("name") like "%J%") //Another filter clause.
>>>>>
>>>>> println("Data frame schema:")
>>>>>
>>>>> igniteDF.printSchema() //Printing query schema to console.
>>>>>
>>>>> println("Data frame content:")
>>>>>
>>>>> igniteDF.show() //Printing query results to console.
>>>>> }
>>>>>
>>>>>
>>>>> def main(args: Array[String]): Unit = {
>>>>>
>>>>> setupExampleData
>>>>>
>>>>> //Creating spark session.
>>>>> implicit val spark = SparkSession.builder()
>>>>> .appName("Spark Ignite data sources example")
>>>>> .master("local")
>>>>> .config("spark.executor.instances", "2")
>>>>> .getOrCreate()
>>>>>
>>>>> // Adjust the logger to exclude the logs of no interest.
>>>>> Logger.getRootLogger.setLevel(Level.ERROR)
>>>>> Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
>>>>>
>>>>> //sparkDSLExample
>>>>>
>>>>>
>>>>> val df = spark.read
>>>>> .format("jdbc")
>>>>> .option("url", "jdbc:ignite:thin://3.88.248.113")
>>>>> .option("fetchsize",100)
>>>>> //.option("driver", "org.apache.ignite.IgniteJdbcDriver")
>>>>> .option("dbtable", "Person").load()
>>>>>
>>>>> df.printSchema()
>>>>>
>>>>> df.createOrReplaceTempView("test")
>>>>>
>>>>> spark.sql("select * from test where id=1").show(10)
>>>>>
>>>>> spark.sql("select 4,'blah',124232").show(10)
>>>>>
>>>>> import java.sql.DriverManager
>>>>> val connection =
>>>>> DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
>>>>>
>>>>> import java.util.Properties
>>>>> val connectionProperties = new Properties()
>>>>>
>>>>> connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
>>>>>
>>>>> spark.sql("select 4 as ID,'blah' as STREET,124232 as
>>>>> ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
>>>>> "Person",connectionProperties)
>>>>>
>>>>> spark.read
>>>>> .format("jdbc")
>>>>> .option("url", "jdbc:ignite:thin://3.88.248.113")
>>>>> .option("fetchsize",100)
>>>>> .option("dbtable", "Person").load().show(10,false)
>>>>>
>>>>> }
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> example-default.xml:-
>>>>>
>>>>> <?xml version="1.0" encoding="UTF-8"?>
>>>>>
>>>>> <!--
>>>>> Licensed to the Apache Software Foundation (ASF) under one or more
>>>>> contributor license agreements. See the NOTICE file distributed with
>>>>> this work for additional information regarding copyright ownership.
>>>>> The ASF licenses this file to You under the Apache License, Version 2.0
>>>>> (the "License"); you may not use this file except in compliance with
>>>>> the License. You may obtain a copy of the License at
>>>>>
>>>>> http://www.apache.org/licenses/LICENSE-2.0
>>>>>
>>>>> Unless required by applicable law or agreed to in writing, software
>>>>> distributed under the License is distributed on an "AS IS" BASIS,
>>>>> WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>>>> See the License for the specific language governing permissions and
>>>>> limitations under the License.
>>>>> -->
>>>>>
>>>>> <!--
>>>>> Ignite configuration with all defaults and enabled p2p deployment and
>>>>> enabled events.
>>>>> -->
>>>>> <beans xmlns="http://www.springframework.org/schema/beans"
>>>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>>>> xmlns:util="http://www.springframework.org/schema/util"
>>>>> xsi:schemaLocation="
>>>>> http://www.springframework.org/schema/beans
>>>>> http://www.springframework.org/schema/beans/spring-beans.xsd
>>>>> http://www.springframework.org/schema/util
>>>>> http://www.springframework.org/schema/util/spring-util.xsd">
>>>>> <bean abstract="true" id="ignite.cfg"
>>>>> class="org.apache.ignite.configuration.IgniteConfiguration">
>>>>> <!-- Set to true to enable distributed class loading for
>>>>> examples, default is false. -->
>>>>> <property name="peerClassLoadingEnabled" value="true"/>
>>>>>
>>>>> <!-- Enable task execution events for examples. -->
>>>>> <property name="includeEventTypes">
>>>>> <list>
>>>>> <!--Task execution events-->
>>>>> <util:constant
>>>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
>>>>> <util:constant
>>>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
>>>>> <util:constant
>>>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
>>>>> <util:constant
>>>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
>>>>> <util:constant
>>>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
>>>>> <util:constant
>>>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
>>>>>
>>>>> <!--Cache events-->
>>>>> <util:constant
>>>>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
>>>>> <util:constant
>>>>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
>>>>> <util:constant
>>>>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
>>>>> </list>
>>>>> </property>
>>>>>
>>>>> <!-- Explicitly configure TCP discovery SPI to provide list of
>>>>> initial nodes. -->
>>>>> <property name="discoverySpi">
>>>>> <bean
>>>>> class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
>>>>> <property name="ipFinder">
>>>>> <!--
>>>>> Ignite provides several options for automatic
>>>>> discovery that can be used
>>>>> instead os static IP based discovery. For
>>>>> information on all options refer
>>>>> to our documentation:
>>>>> http://apacheignite.readme.io/docs/cluster-config
>>>>> -->
>>>>> <!-- Uncomment static IP finder to enable
>>>>> static-based discovery of initial nodes. -->
>>>>> <!--<bean
>>>>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
>>>>> <bean
>>>>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
>>>>> <property name="addresses">
>>>>> <list>
>>>>> <!-- In distributed environment, replace
>>>>> with actual host IP address. -->
>>>>> <value>3.88.248.113:10800</value>
>>>>> </list>
>>>>> </property>
>>>>> </bean>
>>>>> </property>
>>>>> </bean>
>>>>> </property>
>>>>> </bean>
>>>>> </beans>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Thanks & Regards
>>>>> Sri Tummala
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>> Thanks & Regards
>>>> Sri Tummala
>>>>
>>>>
>>>
>>> --
>>> Thanks & Regards
>>> Sri Tummala
>>>
>>>
>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>
>
> --
> Thanks & Regards
> Sri Tummala
>
>
--
Thanks & Regards
Sri Tummala