I got it so I have to build a jar move to aws ignite cluster node and run it on the node.
Thanks Sri On Thursday, October 24, 2019, Denis Magda <[email protected]> wrote: > Sri, > > That's wrong to assume that the Ignite thin client is the only way to > access data stored in AWS or other cloud environments. The problems you are > experiencing are caused by the fact that your application is running on the > laptop while the cluster (server nodes) is within an AWS network. > > If you use a thick/regular client, then servers running on AWS need to > have a way to *open* network connections to the application. This won't > happen if the laptop or dev environment doesn't have public IPs reachable > from AWS. Plus, additional network configuration might be needed. > > Overall, thick clients work with AWS, but either an application needs to > be deployed to AWS during development, or you need to start a local cluster > in your network/premises first, accomplish development and move on with > testing in AWS. A network configuration is also an option but a tricky one > and might not be doable. > > Thin clients are a solution as well if key-value and SQL APIs are all you > need. > > - > Denis > > > On Sun, Oct 20, 2019 at 2:06 PM sri hari kali charan Tummala < > [email protected]> wrote: > > so I got an answer from Grid Grain computing, JDBC thin client is the only > way yo connect the ignite cluster which is running on aws. > > https://forums.gridgain.com/community-home/digestviewer/ > viewthread?MessageKey=13f5e836-1569-486a-8475-84c70fc141e0&CommunityKey= > 3b551477-7e2d-462f-bc5f-d6d10ccbbe35&tab=digestviewer& > SuccessMsg=Thank+you+for+submitting+your+message. > > Thanks > Sri > > On Fri, Oct 18, 2019 at 1:38 PM sri hari kali charan Tummala < > [email protected]> wrote: > > Ok I created a 2 node ec2 instance ignite cluster is below is the right > way to create cache? the code still using my laptop resources unable to > connect ignite cluster. > > Out put:- > [13:36:36] Ignite node started OK (id=8535f4d3) > [13:36:36] Topology snapshot [ver=30, locNode=8535f4d3, *servers=1,* > clients=1, state=ACTIVE, CPUs=8, *offheap=6.4GB, heap=14.0GB*] > >>> cache acquired > > package com.ignite.examples.igniteStartup > > import java.util.Arrays > import java.util.List > import com.ignite.examples.model.Address > import org.apache.commons.logging.Log > import org.apache.commons.logging.LogFactory > import org.apache.ignite.Ignite > import org.apache.ignite.IgniteCache > import org.apache.ignite.Ignition > import org.apache.ignite.configuration.IgniteConfiguration > import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi > import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder > > //remove if not needed > import scala.collection.JavaConversions._ > > object IgniteStart2 { > > private var cache: IgniteCache[String, Address] = _ > > def main(args: Array[String]): Unit = { > > val spi: TcpDiscoverySpi = new TcpDiscoverySpi() > val ipFinder: TcpDiscoveryVmIpFinder = new TcpDiscoveryVmIpFinder() > val hostList: List[String] = > Arrays.asList("127.0.0.1:47500..47509,18.206.247.40:47500..47509,52.207.217.31:47500..47509".split(","): > _*) > ipFinder.setAddresses(hostList) > spi.setIpFinder(ipFinder) > val cfg: IgniteConfiguration = new IgniteConfiguration() > cfg.setDiscoverySpi(spi) > cfg.setClientMode(true) > System.out.println(">>> I am here") > > cfg.setPeerClassLoadingEnabled(true) > val ignite: Ignite = Ignition.start(cfg) > cache = Ignition.ignite().cache("test") > System.out.println(">>> cache acquired") > > System.exit(0) > > > } > > } > > > On Fri, Oct 18, 2019 at 12:56 PM sri hari kali charan Tummala < > [email protected]> wrote: > > Hi Stephen , > > I followed below steps and created one node ec2 instance with Ignite on it > now I am connecting to the ignite cluster using spark in two ways 1) thin > client 2) I don't know you have to tell me > > my question is what is the value I have to pass for option 2 is it ec2 > instance public IP with port 47500..47509 in my example-default.xml file ? > > https://www.gridgain.com/docs/8.7.6//installation-guide/ > manual-install-on-ec2 > > Thanks > Sri > > On Fri, Oct 18, 2019 at 12:47 PM Stephen Darlington < > [email protected]> wrote: > > You’re still pointing your Spark node (*thick* client) at port 10800 (the > *thin* client port). This is not going to work. > > You can create a table using sqlline and read it using Spark, or vice > versa. But you need a functioning cluster. > > Check out the documentation on clustering concepts and configuration: > https://www.gridgain.com/docs/latest/developers-guide/clustering/ > clustering > > On 18 Oct 2019, at 16:32, sri hari kali charan Tummala < > [email protected]> wrote: > > Hi Stephen/All, > > got it working somewhat using below, but have an issue table and data > which is created using thin client is failing to read using spark but table > created by spark can be read using a thin client, that means table created > in Ignite using spark are the only ones read using spark in Ignite? > > example-default.xml > > <property name="addresses"> > <list> > <!-- In distributed environment, replace with actual host IP address. > --> > <value>18.206.247.40:10800</value> > </list> > </property> > > > NotWorking Scala Function:- > > def readThinClientTableUsingSpark(implicit spark: SparkSession) = { > val personDataFrame = spark.read > .format(FORMAT_IGNITE) > .option(OPTION_CONFIG_FILE, CONFIG) > .option(OPTION_TABLE, "person") > .load() > > println() > println("Data frame thin connection content:person") > println() > > //Printing content of data frame to console. > personDataFrame.show() > } > > > > > > > > > > > > > > *Exception in thread "main" class org.apache.ignite.IgniteException: > Unknown table person at > org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46) > at > org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46) > at scala.Option.getOrElse(Option.scala:121) at > org.apache.ignite.spark.impl.IgniteSQLRelation.schema(IgniteSQLRelation.scala:46) > at > org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:431) > at > org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239) > at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at > org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164) at > com.ignite.examples.spark.SparkIgniteCleanCode$.readThinClientTableUsingSpark(SparkIgniteCleanCode.scala:154) > at > com.ignite.examples.spark.SparkIgniteCleanCode$.main(SparkIgniteCleanCode.scala:216) > at > com.ignite.examples.spark.SparkIgniteCleanCode.main(SparkIgniteCleanCode.scala)* > > *Full Code:- (step 7 Fails)* > > package com.ignite.examples.spark > > import com.ignite.examples.model.Address > import org.apache.ignite.{Ignite, Ignition} > import org.apache.ignite.cache.query.SqlFieldsQuery > import org.apache.ignite.client.{ClientCache, IgniteClient} > import org.apache.ignite.configuration.{CacheConfiguration, > ClientConfiguration} > import java.lang.{Long => JLong, String => JString} > > import com.ignite.examples.spark.SparkClientConnectionTest.{CACHE_NAME, > CONFIG} > import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath > import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, > OPTION_CONFIG_FILE, OPTION_CREATE_TABLE_PARAMETERS, > OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, OPTION_TABLE} > import org.apache.log4j.{Level, Logger} > import org.apache.spark.sql.{SaveMode, SparkSession} > > object SparkIgniteCleanCode { > > private val CACHE_NAME = "SparkCache" > > private val CONFIG = > "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml" > > def setupExampleData = { > > val cfg2 = new ClientConfiguration().setAddresses("18.206.247.40:10800") > val igniteClient:IgniteClient = Ignition.startClient(cfg2) > > System.out.format(">>> Created cache [%s].\n", CACHE_NAME) > > val cache:ClientCache[Integer, Address] = > igniteClient.getOrCreateCache(CACHE_NAME) > > cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS > Person")) > .setSchema("PUBLIC")).getAll > > cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS > Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH > \"VALUE_TYPE=%s\"", classOf[Address].getName)) > .setSchema("PUBLIC")).getAll > > cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) > VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", > "04074").setSchema("PUBLIC")).getAll > cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) > VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", > "520003").setSchema("PUBLIC")).getAll > cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) > VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", > "1234").setSchema("PUBLIC")).getAll > > System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME) > > val data=cache.query(new SqlFieldsQuery("select * from > Person").setSchema("PUBLIC")).getAll > > println(data.toString) > } > > def sparkReadIgniteWithThinClient(implicit spark: SparkSession)={ > > val df = spark.read > .format("jdbc") > .option("url", "jdbc:ignite:thin://18.206.247.40") > .option("fetchsize",100) > //.option("driver", "org.apache.ignite.IgniteJdbcDriver") > .option("dbtable", "Person").load() > > df.printSchema() > > df.createOrReplaceTempView("test") > > spark.sql("select * from test where id=1").show(10) > > spark.sql("select 4,'blah',124232").show(10) > > } > > def sparkWriteIgniteWithThinClient(implicit spark: SparkSession)={ > > import java.sql.DriverManager > val connection = > DriverManager.getConnection("jdbc:ignite:thin://18.206.247.40") > > import java.util.Properties > val connectionProperties = new Properties() > > connectionProperties.put("url", "jdbc:ignite:thin://18.206.247.40") > > spark.sql("select 4 as ID,'blah' as STREET,124232 as > ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://18.206.247.40", > "Person",connectionProperties) > > spark.read > .format("jdbc") > .option("url", "jdbc:ignite:thin://18.206.247.40") > .option("fetchsize",100) > .option("dbtable", "Person").load().show(10,false) > > } > > def writeJSonToIgniteUsingSpark(implicit spark: SparkSession): Unit = { > > val ignite = Ignition.start(CONFIG) > > //Load content of json file to data frame. > val personsDataFrame = spark.read.json( > > resolveIgnitePath("/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/resources/person.json").getAbsolutePath) > > println() > println("Json file content:") > println() > > //Printing content of json file to console. > personsDataFrame.show() > > println() > println("Writing Data Frame to Ignite:") > println() > > //Writing content of data frame to Ignite. > personsDataFrame.write > .format(FORMAT_IGNITE) > .mode(org.apache.spark.sql.SaveMode.Append) > .option(OPTION_CONFIG_FILE, CONFIG) > .option(OPTION_TABLE, "json_person") > .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id") > .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated") > .save() > > println("Done!") > > println() > println("Reading data from Ignite table:") > println() > > val cache = ignite.cache[Any, Any](CACHE_NAME) > > //Reading saved data from Ignite. > val data = cache.query(new SqlFieldsQuery("SELECT id, name, department > FROM json_person")).getAll > > println(data.toString) > > //data.foreach { row ⇒ println(row.mkString("[", ", ", "]")) } > } > > def readIgniteUsingSpark(implicit spark: SparkSession) = { > val json_person = spark.read > .format(FORMAT_IGNITE) > .option(OPTION_CONFIG_FILE, CONFIG) > .option(OPTION_TABLE, "json_person") > .load() > > println() > println("Data frame content:json_person") > println() > > //Printing content of data frame to console. > json_person.show() > } > > > def readThinClientTableUsingSpark(implicit spark: SparkSession) = { > val personDataFrame = spark.read > .format(FORMAT_IGNITE) > .option(OPTION_CONFIG_FILE, CONFIG) > .option(OPTION_TABLE, "person") > .load() > > println() > println("Data frame thin connection content:person") > println() > > //Printing content of data frame to console. > personDataFrame.show() > } > > > def main(args: Array[String]): Unit = { > > println() > println("Step 1 setupExampleData:") > println() > > setupExampleData > > println() > println("Step 2 createSparkSession:") > println() > > //Creating spark session. > implicit val spark = SparkSession.builder() > .appName("Spark Ignite data sources example") > .master("local") > .config("spark.executor.instances", "2") > .getOrCreate() > > // Adjust the logger to exclude the logs of no interest. > Logger.getRootLogger.setLevel(Level.ERROR) > Logger.getLogger("org.apache.ignite").setLevel(Level.INFO) > > println() > println("Step 3 ReadIgniteWithThinClient of Step1 Data:") > println() > > sparkReadIgniteWithThinClient(spark) > > println() > println("Step 4 sparkWriteIgniteWithThinClient of Step1 Data:") > println() > > sparkWriteIgniteWithThinClient(spark) > > println() > println("Step 5 writeJSonToIgniteUsingSpark Using Spark:") > println() > > writeJSonToIgniteUsingSpark(spark) > > println() > println("Step 6 readIgniteUsingSpark Using Spark:") > println() > > readIgniteUsingSpark(spark) > > println() > println("Step 7 readThinClientTableUsingSpark Using Spark:") > println() > > readThinClientTableUsingSpark(spark) > > > spark.stop() > > > } > > } > > > example-default.xml > > <?xml version="1.0" encoding="UTF-8"?> > > <!-- > Licensed to the Apache Software Foundation (ASF) under one or more > contributor license agreements. See the NOTICE file distributed with > this work for additional information regarding copyright ownership. > The ASF licenses this file to You under the Apache License, Version 2.0 > (the "License"); you may not use this file except in compliance with > the License. You may obtain a copy of the License at > > http://www.apache.org/licenses/LICENSE-2.0 > > Unless required by applicable law or agreed to in writing, software > distributed under the License is distributed on an "AS IS" BASIS, > WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > See the License for the specific language governing permissions and > limitations under the License. > --> > > <!-- > Ignite configuration with all defaults and enabled p2p deployment and > enabled events. > --> > <beans xmlns="http://www.springframework.org/schema/beans" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xmlns:util="http://www.springframework.org/schema/util" > xsi:schemaLocation=" > http://www.springframework.org/schema/beans > http://www.springframework.org/schema/beans/spring-beans.xsd > http://www.springframework.org/schema/util > http://www.springframework.org/schema/util/spring-util.xsd"> > <bean abstract="true" id="ignite.cfg" > class="org.apache.ignite.configuration.IgniteConfiguration"> > <!-- Set to true to enable distributed class loading for examples, > default is false. --> > <property name="peerClassLoadingEnabled" value="true"/> > > <!-- Enable task execution events for examples. --> > <property name="includeEventTypes"> > <list> > <!--Task execution events--> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/> > > <!--Cache events--> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> > </list> > </property> > > <!-- Explicitly configure TCP discovery SPI to provide list of > initial nodes. --> > <property name="discoverySpi"> > <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> > <property name="ipFinder"> > <!-- > Ignite provides several options for automatic > discovery that can be used > instead os static IP based discovery. For information > on all options refer > to our documentation: > http://apacheignite.readme.io/docs/cluster-config > --> > <!-- Uncomment static IP finder to enable static-based > discovery of initial nodes. --> > <!--<bean > class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> > <bean > class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> > <property name="addresses"> > <list> > <!-- In distributed environment, replace with > actual host IP address. --> > <value>18.206.247.40:10800</value> > </list> > </property> > </bean> > </property> > </bean> > </property> > </bean> > </beans> > > > Thanks > Sri > > On Fri, Oct 18, 2019 at 9:23 AM sri hari kali charan Tummala < > [email protected]> wrote: > > do you mean communication ports 47100-47200 as mentioned ( > https://www.gridgain.com/docs/8.7.6//installation- > guide/manual-install-on-ec2) ? bare in mind I am running my spark job > outside ignite ec2 box (MAC PC). > > which option is right in my default.xml file? > > Option 1:- > > <property name="addresses"> > <list> > <!-- In distributed environment, replace with > actual host IP address. --> > <value>3.88.248.113:4 > <http://3.88.248.113:10800/>7100..47200</value> > </list> > </property> > > > Option 2:- > > <property name="addresses"> > <list> > <!-- In distributed environment, replace with > actual host IP address. --> > <value>3.88.248.113:4 > <http://3.88.248.113:10800/>7500..47600</value> > </list> > </property> > > > Option 3:- > > <property name="addresses"> > <list> > <!-- In distributed environment, replace with > actual host IP address. --> > <value>3.88.248.113 > <http://3.88.248.113:10800/></value> > </list> > </property> > > > > Thanks > > On Fri, Oct 18, 2019 at 9:18 AM sri hari kali charan Tummala < > [email protected]> wrote: > > Hi Stephen , > > do you mean 3.88.248.113: <http://3.88.248.113:10800/>47500..47700 > something like this? or just public ip 3.88.248.113 > <http://3.88.248.113:10800/> I tried all the possibilities none of them > are getting connected. > > Thanks > Sri > > On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington < > [email protected]> wrote: > > You’re trying to connect a thick client (the Spark integration) to the > thin client port (10800). Your example-default.xml file needs to have the > same configuration as your server node(s). > > Regards, > Stephen > > On 17 Oct 2019, at 18:12, sri hari kali charan Tummala < > [email protected]> wrote: > > Hi Community, > > I am trying to read and write into the Ignite cluster using apache-spark I > am able to do that using JDBC thin client but not native method as > mentioned in several spark + ignite examples. > > Right now all the spark + ignite examples launch a local ignite cluster > but I want my code connecting to already existing cluster (client). > > Question:- > *How to pass Ignite connection ip and port (10800) 10800 in > example-default.xml ?* > > Error:- > *TcpDiscoverySpi: Failed to connect to any address from IP finder (will > retry to join topology every 2000 ms; change 'reconnectDelay' to configure > the frequency of retries): [/3.88.248.113:10800 > <http://3.88.248.113:10800/>]* > > Working (Spark + Ignite using JDBC):- > > val df = spark.read > .format("jdbc") > .option("url", "jdbc:ignite:thin://3.88.248.113") > .option("fetchsize",100) > //.option("driver", "org.apache.ignite.IgniteJdbcDriver") > .option("dbtable", "Person").load() > > df.printSchema() > > df.createOrReplaceTempView("test") > > spark.sql("select * from test where id=1").show(10) > > spark.sql("select 4,'blah',124232").show(10) > > import java.sql.DriverManager > val connection = > DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113") > > import java.util.Properties > val connectionProperties = new Properties() > > connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113") > > spark.sql("select 4 as ID,'blah' as STREET,124232 as > ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", > "Person",connectionProperties) > > spark.read > .format("jdbc") > .option("url", "jdbc:ignite:thin://3.88.248.113") > .option("fetchsize",100) > .option("dbtable", "Person").load().show(10,false) > > > Not Working requires a CONFIG file which is example-default.xml:- > > val igniteDF = spark.read > .format(FORMAT_IGNITE) //Data source type. > .option(OPTION_TABLE, "person") //Table to read. > .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. > .load() > .filter(col("id") >= 2) //Filter clause. > .filter(col("name") like "%J%") //Another filter clause. > > > Full Code:- (sparkDSLExample) function fails to connect ignite cluster > which I already have > > package com.ignite.examples.spark > > import com.ignite.examples.model.Address > import org.apache.ignite.{Ignite, Ignition} > import org.apache.ignite.cache.query.SqlFieldsQuery > import org.apache.ignite.client.{ClientCache, IgniteClient} > import org.apache.ignite.configuration.{CacheConfiguration, > ClientConfiguration} > import java.lang.{Long => JLong, String => JString} > > import org.apache.ignite.cache.query.SqlFieldsQuery > import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, > OPTION_CONFIG_FILE, OPTION_TABLE} > import org.apache.log4j.{Level, Logger} > import org.apache.spark.sql.{SaveMode, SparkSession} > import org.apache.spark.sql.functions.col > > object SparkClientConnectionTest { > > private val CACHE_NAME = "SparkCache" > > private val CONFIG = > "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml" > > def setupExampleData = { > > val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800") > val igniteClient:IgniteClient = Ignition.startClient(cfg2) > > System.out.format(">>> Created cache [%s].\n", CACHE_NAME) > > val cache:ClientCache[Integer, Address] = > igniteClient.getOrCreateCache(CACHE_NAME) > > cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS > Person")) > .setSchema("PUBLIC")).getAll > > cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS > Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH > \"VALUE_TYPE=%s\"", classOf[Address].getName)) > .setSchema("PUBLIC")).getAll > > cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) > VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", > "04074").setSchema("PUBLIC")).getAll > cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) > VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", > "520003").setSchema("PUBLIC")).getAll > cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) > VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", > "1234").setSchema("PUBLIC")).getAll > > System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME) > > val data=cache.query(new SqlFieldsQuery("select * from > Person").setSchema("PUBLIC")).getAll > > println(data.toString) > } > > def sparkDSLExample(implicit spark: SparkSession): Unit = { > println("Querying using Spark DSL.") > println > > > val igniteDF = spark.read > .format(FORMAT_IGNITE) //Data source type. > .option(OPTION_TABLE, "person") //Table to read. > .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. > .load() > .filter(col("id") >= 2) //Filter clause. > .filter(col("name") like "%J%") //Another filter clause. > > println("Data frame schema:") > > igniteDF.printSchema() //Printing query schema to console. > > println("Data frame content:") > > igniteDF.show() //Printing query results to console. > } > > > def main(args: Array[String]): Unit = { > > setupExampleData > > //Creating spark session. > implicit val spark = SparkSession.builder() > .appName("Spark Ignite data sources example") > .master("local") > .config("spark.executor.instances", "2") > .getOrCreate() > > // Adjust the logger to exclude the logs of no interest. > Logger.getRootLogger.setLevel(Level.ERROR) > Logger.getLogger("org.apache.ignite").setLevel(Level.INFO) > > //sparkDSLExample > > > val df = spark.read > .format("jdbc") > .option("url", "jdbc:ignite:thin://3.88.248.113") > .option("fetchsize",100) > //.option("driver", "org.apache.ignite.IgniteJdbcDriver") > .option("dbtable", "Person").load() > > df.printSchema() > > df.createOrReplaceTempView("test") > > spark.sql("select * from test where id=1").show(10) > > spark.sql("select 4,'blah',124232").show(10) > > import java.sql.DriverManager > val connection = > DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113") > > import java.util.Properties > val connectionProperties = new Properties() > > connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113") > > spark.sql("select 4 as ID,'blah' as STREET,124232 as > ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", > "Person",connectionProperties) > > spark.read > .format("jdbc") > .option("url", "jdbc:ignite:thin://3.88.248.113") > .option("fetchsize",100) > .option("dbtable", "Person").load().show(10,false) > > } > > } > > > example-default.xml:- > > <?xml version="1.0" encoding="UTF-8"?> > > <!-- > Licensed to the Apache Software Foundation (ASF) under one or more > contributor license agreements. See the NOTICE file distributed with > this work for additional information regarding copyright ownership. > The ASF licenses this file to You under the Apache License, Version 2.0 > (the "License"); you may not use this file except in compliance with > the License. You may obtain a copy of the License at > > http://www.apache.org/licenses/LICENSE-2.0 > > Unless required by applicable law or agreed to in writing, software > distributed under the License is distributed on an "AS IS" BASIS, > WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > See the License for the specific language governing permissions and > limitations under the License. > --> > > <!-- > Ignite configuration with all defaults and enabled p2p deployment and > enabled events. > --> > <beans xmlns="http://www.springframework.org/schema/beans" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xmlns:util="http://www.springframework.org/schema/util" > xsi:schemaLocation=" > http://www.springframework.org/schema/beans > http://www.springframework.org/schema/beans/spring-beans.xsd > http://www.springframework.org/schema/util > http://www.springframework.org/schema/util/spring-util.xsd"> > <bean abstract="true" id="ignite.cfg" > class="org.apache.ignite.configuration.IgniteConfiguration"> > <!-- Set to true to enable distributed class loading for examples, > default is false. --> > <property name="peerClassLoadingEnabled" value="true"/> > > <!-- Enable task execution events for examples. --> > <property name="includeEventTypes"> > <list> > <!--Task execution events--> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/> > > <!--Cache events--> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> > </list> > </property> > > <!-- Explicitly configure TCP discovery SPI to provide list of > initial nodes. --> > <property name="discoverySpi"> > <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> > <property name="ipFinder"> > <!-- > Ignite provides several options for automatic > discovery that can be used > instead os static IP based discovery. For information > on all options refer > to our documentation: > http://apacheignite.readme.io/docs/cluster-config > --> > <!-- Uncomment static IP finder to enable static-based > discovery of initial nodes. --> > <!--<bean > class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> > <bean > class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> > <property name="addresses"> > <list> > <!-- In distributed environment, replace with > actual host IP address. --> > <value>3.88.248.113:10800</value> > > -- Thanks & Regards Sri Tummala
