so I got an answer from Grid Grain computing, JDBC thin client is the only way yo connect the ignite cluster which is running on aws.
https://forums.gridgain.com/community-home/digestviewer/viewthread?MessageKey=13f5e836-1569-486a-8475-84c70fc141e0&CommunityKey=3b551477-7e2d-462f-bc5f-d6d10ccbbe35&tab=digestviewer&SuccessMsg=Thank+you+for+submitting+your+message. Thanks Sri On Fri, Oct 18, 2019 at 1:38 PM sri hari kali charan Tummala < [email protected]> wrote: > Ok I created a 2 node ec2 instance ignite cluster is below is the right > way to create cache? the code still using my laptop resources unable to > connect ignite cluster. > > Out put:- > [13:36:36] Ignite node started OK (id=8535f4d3) > [13:36:36] Topology snapshot [ver=30, locNode=8535f4d3, *servers=1,* > clients=1, state=ACTIVE, CPUs=8, *offheap=6.4GB, heap=14.0GB*] > >>> cache acquired > > package com.ignite.examples.igniteStartup > > import java.util.Arrays > import java.util.List > import com.ignite.examples.model.Address > import org.apache.commons.logging.Log > import org.apache.commons.logging.LogFactory > import org.apache.ignite.Ignite > import org.apache.ignite.IgniteCache > import org.apache.ignite.Ignition > import org.apache.ignite.configuration.IgniteConfiguration > import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi > import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder > > //remove if not needed > import scala.collection.JavaConversions._ > > object IgniteStart2 { > > private var cache: IgniteCache[String, Address] = _ > > def main(args: Array[String]): Unit = { > > val spi: TcpDiscoverySpi = new TcpDiscoverySpi() > val ipFinder: TcpDiscoveryVmIpFinder = new TcpDiscoveryVmIpFinder() > val hostList: List[String] = > Arrays.asList("127.0.0.1:47500..47509,18.206.247.40:47500..47509,52.207.217.31:47500..47509".split(","): > _*) > ipFinder.setAddresses(hostList) > spi.setIpFinder(ipFinder) > val cfg: IgniteConfiguration = new IgniteConfiguration() > cfg.setDiscoverySpi(spi) > cfg.setClientMode(true) > System.out.println(">>> I am here") > > cfg.setPeerClassLoadingEnabled(true) > val ignite: Ignite = Ignition.start(cfg) > cache = Ignition.ignite().cache("test") > System.out.println(">>> cache acquired") > > System.exit(0) > > > } > > } > > > On Fri, Oct 18, 2019 at 12:56 PM sri hari kali charan Tummala < > [email protected]> wrote: > >> Hi Stephen , >> >> I followed below steps and created one node ec2 instance with Ignite on >> it now I am connecting to the ignite cluster using spark in two ways 1) >> thin client 2) I don't know you have to tell me >> >> my question is what is the value I have to pass for option 2 is it ec2 >> instance public IP with port 47500..47509 in my example-default.xml file ? >> >> >> https://www.gridgain.com/docs/8.7.6//installation-guide/manual-install-on-ec2 >> >> Thanks >> Sri >> >> On Fri, Oct 18, 2019 at 12:47 PM Stephen Darlington < >> [email protected]> wrote: >> >>> You’re still pointing your Spark node (*thick* client) at port 10800 >>> (the *thin* client port). This is not going to work. >>> >>> You can create a table using sqlline and read it using Spark, or vice >>> versa. But you need a functioning cluster. >>> >>> Check out the documentation on clustering concepts and configuration: >>> https://www.gridgain.com/docs/latest/developers-guide/clustering/clustering >>> >>> >>> On 18 Oct 2019, at 16:32, sri hari kali charan Tummala < >>> [email protected]> wrote: >>> >>> Hi Stephen/All, >>> >>> got it working somewhat using below, but have an issue table and data >>> which is created using thin client is failing to read using spark but table >>> created by spark can be read using a thin client, that means table created >>> in Ignite using spark are the only ones read using spark in Ignite? >>> >>> example-default.xml >>> >>> <property name="addresses"> >>> <list> >>> <!-- In distributed environment, replace with actual host IP >>> address. --> >>> <value>18.206.247.40:10800</value> >>> </list> >>> </property> >>> >>> >>> NotWorking Scala Function:- >>> >>> def readThinClientTableUsingSpark(implicit spark: SparkSession) = { >>> val personDataFrame = spark.read >>> .format(FORMAT_IGNITE) >>> .option(OPTION_CONFIG_FILE, CONFIG) >>> .option(OPTION_TABLE, "person") >>> .load() >>> >>> println() >>> println("Data frame thin connection content:person") >>> println() >>> >>> //Printing content of data frame to console. >>> personDataFrame.show() >>> } >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> *Exception in thread "main" class org.apache.ignite.IgniteException: >>> Unknown table person at >>> org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46) >>> at >>> org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46) >>> at scala.Option.getOrElse(Option.scala:121) at >>> org.apache.ignite.spark.impl.IgniteSQLRelation.schema(IgniteSQLRelation.scala:46) >>> at >>> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:431) >>> at >>> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239) >>> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at >>> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164) at >>> com.ignite.examples.spark.SparkIgniteCleanCode$.readThinClientTableUsingSpark(SparkIgniteCleanCode.scala:154) >>> at >>> com.ignite.examples.spark.SparkIgniteCleanCode$.main(SparkIgniteCleanCode.scala:216) >>> at >>> com.ignite.examples.spark.SparkIgniteCleanCode.main(SparkIgniteCleanCode.scala)* >>> >>> *Full Code:- (step 7 Fails)* >>> >>> package com.ignite.examples.spark >>> >>> import com.ignite.examples.model.Address >>> import org.apache.ignite.{Ignite, Ignition} >>> import org.apache.ignite.cache.query.SqlFieldsQuery >>> import org.apache.ignite.client.{ClientCache, IgniteClient} >>> import org.apache.ignite.configuration.{CacheConfiguration, >>> ClientConfiguration} >>> import java.lang.{Long => JLong, String => JString} >>> >>> import com.ignite.examples.spark.SparkClientConnectionTest.{CACHE_NAME, >>> CONFIG} >>> import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath >>> import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, >>> OPTION_CONFIG_FILE, OPTION_CREATE_TABLE_PARAMETERS, >>> OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, OPTION_TABLE} >>> import org.apache.log4j.{Level, Logger} >>> import org.apache.spark.sql.{SaveMode, SparkSession} >>> >>> object SparkIgniteCleanCode { >>> >>> private val CACHE_NAME = "SparkCache" >>> >>> private val CONFIG = >>> "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml" >>> >>> def setupExampleData = { >>> >>> val cfg2 = new ClientConfiguration().setAddresses("18.206.247.40:10800") >>> val igniteClient:IgniteClient = Ignition.startClient(cfg2) >>> >>> System.out.format(">>> Created cache [%s].\n", CACHE_NAME) >>> >>> val cache:ClientCache[Integer, Address] = >>> igniteClient.getOrCreateCache(CACHE_NAME) >>> >>> cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS >>> Person")) >>> .setSchema("PUBLIC")).getAll >>> >>> cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT >>> EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH >>> \"VALUE_TYPE=%s\"", classOf[Address].getName)) >>> .setSchema("PUBLIC")).getAll >>> >>> cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) >>> VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", >>> "04074").setSchema("PUBLIC")).getAll >>> cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) >>> VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", >>> "520003").setSchema("PUBLIC")).getAll >>> cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) >>> VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", >>> "1234").setSchema("PUBLIC")).getAll >>> >>> System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME) >>> >>> val data=cache.query(new SqlFieldsQuery("select * from >>> Person").setSchema("PUBLIC")).getAll >>> >>> println(data.toString) >>> } >>> >>> def sparkReadIgniteWithThinClient(implicit spark: SparkSession)={ >>> >>> val df = spark.read >>> .format("jdbc") >>> .option("url", "jdbc:ignite:thin://18.206.247.40") >>> .option("fetchsize",100) >>> //.option("driver", "org.apache.ignite.IgniteJdbcDriver") >>> .option("dbtable", "Person").load() >>> >>> df.printSchema() >>> >>> df.createOrReplaceTempView("test") >>> >>> spark.sql("select * from test where id=1").show(10) >>> >>> spark.sql("select 4,'blah',124232").show(10) >>> >>> } >>> >>> def sparkWriteIgniteWithThinClient(implicit spark: SparkSession)={ >>> >>> import java.sql.DriverManager >>> val connection = >>> DriverManager.getConnection("jdbc:ignite:thin://18.206.247.40") >>> >>> import java.util.Properties >>> val connectionProperties = new Properties() >>> >>> connectionProperties.put("url", "jdbc:ignite:thin://18.206.247.40") >>> >>> spark.sql("select 4 as ID,'blah' as STREET,124232 as >>> ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://18.206.247.40", >>> "Person",connectionProperties) >>> >>> spark.read >>> .format("jdbc") >>> .option("url", "jdbc:ignite:thin://18.206.247.40") >>> .option("fetchsize",100) >>> .option("dbtable", "Person").load().show(10,false) >>> >>> } >>> >>> def writeJSonToIgniteUsingSpark(implicit spark: SparkSession): Unit = { >>> >>> val ignite = Ignition.start(CONFIG) >>> >>> //Load content of json file to data frame. >>> val personsDataFrame = spark.read.json( >>> >>> resolveIgnitePath("/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/resources/person.json").getAbsolutePath) >>> >>> println() >>> println("Json file content:") >>> println() >>> >>> //Printing content of json file to console. >>> personsDataFrame.show() >>> >>> println() >>> println("Writing Data Frame to Ignite:") >>> println() >>> >>> //Writing content of data frame to Ignite. >>> personsDataFrame.write >>> .format(FORMAT_IGNITE) >>> .mode(org.apache.spark.sql.SaveMode.Append) >>> .option(OPTION_CONFIG_FILE, CONFIG) >>> .option(OPTION_TABLE, "json_person") >>> .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id") >>> .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated") >>> .save() >>> >>> println("Done!") >>> >>> println() >>> println("Reading data from Ignite table:") >>> println() >>> >>> val cache = ignite.cache[Any, Any](CACHE_NAME) >>> >>> //Reading saved data from Ignite. >>> val data = cache.query(new SqlFieldsQuery("SELECT id, name, department >>> FROM json_person")).getAll >>> >>> println(data.toString) >>> >>> //data.foreach { row ⇒ println(row.mkString("[", ", ", "]")) } >>> } >>> >>> def readIgniteUsingSpark(implicit spark: SparkSession) = { >>> val json_person = spark.read >>> .format(FORMAT_IGNITE) >>> .option(OPTION_CONFIG_FILE, CONFIG) >>> .option(OPTION_TABLE, "json_person") >>> .load() >>> >>> println() >>> println("Data frame content:json_person") >>> println() >>> >>> //Printing content of data frame to console. >>> json_person.show() >>> } >>> >>> >>> def readThinClientTableUsingSpark(implicit spark: SparkSession) = { >>> val personDataFrame = spark.read >>> .format(FORMAT_IGNITE) >>> .option(OPTION_CONFIG_FILE, CONFIG) >>> .option(OPTION_TABLE, "person") >>> .load() >>> >>> println() >>> println("Data frame thin connection content:person") >>> println() >>> >>> //Printing content of data frame to console. >>> personDataFrame.show() >>> } >>> >>> >>> def main(args: Array[String]): Unit = { >>> >>> println() >>> println("Step 1 setupExampleData:") >>> println() >>> >>> setupExampleData >>> >>> println() >>> println("Step 2 createSparkSession:") >>> println() >>> >>> //Creating spark session. >>> implicit val spark = SparkSession.builder() >>> .appName("Spark Ignite data sources example") >>> .master("local") >>> .config("spark.executor.instances", "2") >>> .getOrCreate() >>> >>> // Adjust the logger to exclude the logs of no interest. >>> Logger.getRootLogger.setLevel(Level.ERROR) >>> Logger.getLogger("org.apache.ignite").setLevel(Level.INFO) >>> >>> println() >>> println("Step 3 ReadIgniteWithThinClient of Step1 Data:") >>> println() >>> >>> sparkReadIgniteWithThinClient(spark) >>> >>> println() >>> println("Step 4 sparkWriteIgniteWithThinClient of Step1 Data:") >>> println() >>> >>> sparkWriteIgniteWithThinClient(spark) >>> >>> println() >>> println("Step 5 writeJSonToIgniteUsingSpark Using Spark:") >>> println() >>> >>> writeJSonToIgniteUsingSpark(spark) >>> >>> println() >>> println("Step 6 readIgniteUsingSpark Using Spark:") >>> println() >>> >>> readIgniteUsingSpark(spark) >>> >>> println() >>> println("Step 7 readThinClientTableUsingSpark Using Spark:") >>> println() >>> >>> readThinClientTableUsingSpark(spark) >>> >>> >>> spark.stop() >>> >>> >>> } >>> >>> } >>> >>> >>> example-default.xml >>> >>> <?xml version="1.0" encoding="UTF-8"?> >>> >>> <!-- >>> Licensed to the Apache Software Foundation (ASF) under one or more >>> contributor license agreements. See the NOTICE file distributed with >>> this work for additional information regarding copyright ownership. >>> The ASF licenses this file to You under the Apache License, Version 2.0 >>> (the "License"); you may not use this file except in compliance with >>> the License. You may obtain a copy of the License at >>> >>> http://www.apache.org/licenses/LICENSE-2.0 >>> >>> Unless required by applicable law or agreed to in writing, software >>> distributed under the License is distributed on an "AS IS" BASIS, >>> WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >>> See the License for the specific language governing permissions and >>> limitations under the License. >>> --> >>> >>> <!-- >>> Ignite configuration with all defaults and enabled p2p deployment and >>> enabled events. >>> --> >>> <beans xmlns="http://www.springframework.org/schema/beans" >>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >>> xmlns:util="http://www.springframework.org/schema/util" >>> xsi:schemaLocation=" >>> http://www.springframework.org/schema/beans >>> http://www.springframework.org/schema/beans/spring-beans.xsd >>> http://www.springframework.org/schema/util >>> http://www.springframework.org/schema/util/spring-util.xsd"> >>> <bean abstract="true" id="ignite.cfg" >>> class="org.apache.ignite.configuration.IgniteConfiguration"> >>> <!-- Set to true to enable distributed class loading for examples, >>> default is false. --> >>> <property name="peerClassLoadingEnabled" value="true"/> >>> >>> <!-- Enable task execution events for examples. --> >>> <property name="includeEventTypes"> >>> <list> >>> <!--Task execution events--> >>> <util:constant >>> static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/> >>> <util:constant >>> static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/> >>> <util:constant >>> static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/> >>> <util:constant >>> static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/> >>> <util:constant >>> static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/> >>> <util:constant >>> static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/> >>> >>> <!--Cache events--> >>> <util:constant >>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> >>> <util:constant >>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> >>> <util:constant >>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> >>> </list> >>> </property> >>> >>> <!-- Explicitly configure TCP discovery SPI to provide list of >>> initial nodes. --> >>> <property name="discoverySpi"> >>> <bean >>> class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> >>> <property name="ipFinder"> >>> <!-- >>> Ignite provides several options for automatic >>> discovery that can be used >>> instead os static IP based discovery. For >>> information on all options refer >>> to our documentation: >>> http://apacheignite.readme.io/docs/cluster-config >>> --> >>> <!-- Uncomment static IP finder to enable static-based >>> discovery of initial nodes. --> >>> <!--<bean >>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> >>> <bean >>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> >>> <property name="addresses"> >>> <list> >>> <!-- In distributed environment, replace >>> with actual host IP address. --> >>> <value>18.206.247.40:10800</value> >>> </list> >>> </property> >>> </bean> >>> </property> >>> </bean> >>> </property> >>> </bean> >>> </beans> >>> >>> >>> Thanks >>> Sri >>> >>> On Fri, Oct 18, 2019 at 9:23 AM sri hari kali charan Tummala < >>> [email protected]> wrote: >>> >>>> do you mean communication ports 47100-47200 as mentioned ( >>>> https://www.gridgain.com/docs/8.7.6//installation-guide/manual-install-on-ec2) >>>> ? bare in mind I am running my spark job outside ignite ec2 box (MAC PC). >>>> >>>> which option is right in my default.xml file? >>>> >>>> Option 1:- >>>> >>>> <property name="addresses"> >>>> <list> >>>> <!-- In distributed environment, replace >>>> with actual host IP address. --> >>>> <value>3.88.248.113:4 >>>> <http://3.88.248.113:10800/>7100..47200</value> >>>> </list> >>>> </property> >>>> >>>> >>>> Option 2:- >>>> >>>> <property name="addresses"> >>>> <list> >>>> <!-- In distributed environment, replace >>>> with actual host IP address. --> >>>> <value>3.88.248.113:4 >>>> <http://3.88.248.113:10800/>7500..47600</value> >>>> </list> >>>> </property> >>>> >>>> >>>> Option 3:- >>>> >>>> <property name="addresses"> >>>> <list> >>>> <!-- In distributed environment, replace >>>> with actual host IP address. --> >>>> <value>3.88.248.113 >>>> <http://3.88.248.113:10800/></value> >>>> </list> >>>> </property> >>>> >>>> >>>> >>>> Thanks >>>> >>>> On Fri, Oct 18, 2019 at 9:18 AM sri hari kali charan Tummala < >>>> [email protected]> wrote: >>>> >>>>> Hi Stephen , >>>>> >>>>> do you mean 3.88.248.113: <http://3.88.248.113:10800/>47500..47700 >>>>> something like this? or just public ip 3.88.248.113 >>>>> <http://3.88.248.113:10800/> I tried all the possibilities none of >>>>> them are getting connected. >>>>> >>>>> Thanks >>>>> Sri >>>>> >>>>> On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington < >>>>> [email protected]> wrote: >>>>> >>>>>> You’re trying to connect a thick client (the Spark integration) to >>>>>> the thin client port (10800). Your example-default.xml file needs to have >>>>>> the same configuration as your server node(s). >>>>>> >>>>>> Regards, >>>>>> Stephen >>>>>> >>>>>> On 17 Oct 2019, at 18:12, sri hari kali charan Tummala < >>>>>> [email protected]> wrote: >>>>>> >>>>>> Hi Community, >>>>>> >>>>>> I am trying to read and write into the Ignite cluster using >>>>>> apache-spark I am able to do that using JDBC thin client but not native >>>>>> method as mentioned in several spark + ignite examples. >>>>>> >>>>>> Right now all the spark + ignite examples launch a local ignite >>>>>> cluster but I want my code connecting to already existing cluster >>>>>> (client). >>>>>> >>>>>> Question:- >>>>>> *How to pass Ignite connection ip and port (10800) 10800 in >>>>>> example-default.xml ?* >>>>>> >>>>>> Error:- >>>>>> *TcpDiscoverySpi: Failed to connect to any address from IP finder >>>>>> (will retry to join topology every 2000 ms; change 'reconnectDelay' to >>>>>> configure the frequency of retries): [/3.88.248.113:10800 >>>>>> <http://3.88.248.113:10800/>]* >>>>>> >>>>>> Working (Spark + Ignite using JDBC):- >>>>>> >>>>>> val df = spark.read >>>>>> .format("jdbc") >>>>>> .option("url", "jdbc:ignite:thin://3.88.248.113") >>>>>> .option("fetchsize",100) >>>>>> //.option("driver", "org.apache.ignite.IgniteJdbcDriver") >>>>>> .option("dbtable", "Person").load() >>>>>> >>>>>> df.printSchema() >>>>>> >>>>>> df.createOrReplaceTempView("test") >>>>>> >>>>>> spark.sql("select * from test where id=1").show(10) >>>>>> >>>>>> spark.sql("select 4,'blah',124232").show(10) >>>>>> >>>>>> import java.sql.DriverManager >>>>>> val connection = >>>>>> DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113") >>>>>> >>>>>> import java.util.Properties >>>>>> val connectionProperties = new Properties() >>>>>> >>>>>> connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113") >>>>>> >>>>>> spark.sql("select 4 as ID,'blah' as STREET,124232 as >>>>>> ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", >>>>>> "Person",connectionProperties) >>>>>> >>>>>> spark.read >>>>>> .format("jdbc") >>>>>> .option("url", "jdbc:ignite:thin://3.88.248.113") >>>>>> .option("fetchsize",100) >>>>>> .option("dbtable", "Person").load().show(10,false) >>>>>> >>>>>> >>>>>> Not Working requires a CONFIG file which is example-default.xml:- >>>>>> >>>>>> val igniteDF = spark.read >>>>>> .format(FORMAT_IGNITE) //Data source type. >>>>>> .option(OPTION_TABLE, "person") //Table to read. >>>>>> .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. >>>>>> .load() >>>>>> .filter(col("id") >= 2) //Filter clause. >>>>>> .filter(col("name") like "%J%") //Another filter clause. >>>>>> >>>>>> >>>>>> Full Code:- (sparkDSLExample) function fails to connect ignite >>>>>> cluster which I already have >>>>>> >>>>>> package com.ignite.examples.spark >>>>>> >>>>>> import com.ignite.examples.model.Address >>>>>> import org.apache.ignite.{Ignite, Ignition} >>>>>> import org.apache.ignite.cache.query.SqlFieldsQuery >>>>>> import org.apache.ignite.client.{ClientCache, IgniteClient} >>>>>> import org.apache.ignite.configuration.{CacheConfiguration, >>>>>> ClientConfiguration} >>>>>> import java.lang.{Long => JLong, String => JString} >>>>>> >>>>>> import org.apache.ignite.cache.query.SqlFieldsQuery >>>>>> import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, >>>>>> OPTION_CONFIG_FILE, OPTION_TABLE} >>>>>> import org.apache.log4j.{Level, Logger} >>>>>> import org.apache.spark.sql.{SaveMode, SparkSession} >>>>>> import org.apache.spark.sql.functions.col >>>>>> >>>>>> object SparkClientConnectionTest { >>>>>> >>>>>> private val CACHE_NAME = "SparkCache" >>>>>> >>>>>> private val CONFIG = >>>>>> "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml" >>>>>> >>>>>> def setupExampleData = { >>>>>> >>>>>> val cfg2 = new >>>>>> ClientConfiguration().setAddresses("3.88.248.113:10800") >>>>>> val igniteClient:IgniteClient = Ignition.startClient(cfg2) >>>>>> >>>>>> System.out.format(">>> Created cache [%s].\n", CACHE_NAME) >>>>>> >>>>>> val cache:ClientCache[Integer, Address] = >>>>>> igniteClient.getOrCreateCache(CACHE_NAME) >>>>>> >>>>>> cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS >>>>>> Person")) >>>>>> .setSchema("PUBLIC")).getAll >>>>>> >>>>>> cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT >>>>>> EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) >>>>>> WITH \"VALUE_TYPE=%s\"", classOf[Address].getName)) >>>>>> .setSchema("PUBLIC")).getAll >>>>>> >>>>>> cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) >>>>>> VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", >>>>>> "04074").setSchema("PUBLIC")).getAll >>>>>> cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) >>>>>> VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", >>>>>> "520003").setSchema("PUBLIC")).getAll >>>>>> cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) >>>>>> VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", >>>>>> "1234").setSchema("PUBLIC")).getAll >>>>>> >>>>>> System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME) >>>>>> >>>>>> val data=cache.query(new SqlFieldsQuery("select * from >>>>>> Person").setSchema("PUBLIC")).getAll >>>>>> >>>>>> println(data.toString) >>>>>> } >>>>>> >>>>>> def sparkDSLExample(implicit spark: SparkSession): Unit = { >>>>>> println("Querying using Spark DSL.") >>>>>> println >>>>>> >>>>>> >>>>>> val igniteDF = spark.read >>>>>> .format(FORMAT_IGNITE) //Data source type. >>>>>> .option(OPTION_TABLE, "person") //Table to read. >>>>>> .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. >>>>>> .load() >>>>>> .filter(col("id") >= 2) //Filter clause. >>>>>> .filter(col("name") like "%J%") //Another filter clause. >>>>>> >>>>>> println("Data frame schema:") >>>>>> >>>>>> igniteDF.printSchema() //Printing query schema to console. >>>>>> >>>>>> println("Data frame content:") >>>>>> >>>>>> igniteDF.show() //Printing query results to console. >>>>>> } >>>>>> >>>>>> >>>>>> def main(args: Array[String]): Unit = { >>>>>> >>>>>> setupExampleData >>>>>> >>>>>> //Creating spark session. >>>>>> implicit val spark = SparkSession.builder() >>>>>> .appName("Spark Ignite data sources example") >>>>>> .master("local") >>>>>> .config("spark.executor.instances", "2") >>>>>> .getOrCreate() >>>>>> >>>>>> // Adjust the logger to exclude the logs of no interest. >>>>>> Logger.getRootLogger.setLevel(Level.ERROR) >>>>>> Logger.getLogger("org.apache.ignite").setLevel(Level.INFO) >>>>>> >>>>>> //sparkDSLExample >>>>>> >>>>>> >>>>>> val df = spark.read >>>>>> .format("jdbc") >>>>>> .option("url", "jdbc:ignite:thin://3.88.248.113") >>>>>> .option("fetchsize",100) >>>>>> //.option("driver", "org.apache.ignite.IgniteJdbcDriver") >>>>>> .option("dbtable", "Person").load() >>>>>> >>>>>> df.printSchema() >>>>>> >>>>>> df.createOrReplaceTempView("test") >>>>>> >>>>>> spark.sql("select * from test where id=1").show(10) >>>>>> >>>>>> spark.sql("select 4,'blah',124232").show(10) >>>>>> >>>>>> import java.sql.DriverManager >>>>>> val connection = >>>>>> DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113") >>>>>> >>>>>> import java.util.Properties >>>>>> val connectionProperties = new Properties() >>>>>> >>>>>> connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113") >>>>>> >>>>>> spark.sql("select 4 as ID,'blah' as STREET,124232 as >>>>>> ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", >>>>>> "Person",connectionProperties) >>>>>> >>>>>> spark.read >>>>>> .format("jdbc") >>>>>> .option("url", "jdbc:ignite:thin://3.88.248.113") >>>>>> .option("fetchsize",100) >>>>>> .option("dbtable", "Person").load().show(10,false) >>>>>> >>>>>> } >>>>>> >>>>>> } >>>>>> >>>>>> >>>>>> example-default.xml:- >>>>>> >>>>>> <?xml version="1.0" encoding="UTF-8"?> >>>>>> >>>>>> <!-- >>>>>> Licensed to the Apache Software Foundation (ASF) under one or more >>>>>> contributor license agreements. See the NOTICE file distributed with >>>>>> this work for additional information regarding copyright ownership. >>>>>> The ASF licenses this file to You under the Apache License, Version 2.0 >>>>>> (the "License"); you may not use this file except in compliance with >>>>>> the License. You may obtain a copy of the License at >>>>>> >>>>>> http://www.apache.org/licenses/LICENSE-2.0 >>>>>> >>>>>> Unless required by applicable law or agreed to in writing, software >>>>>> distributed under the License is distributed on an "AS IS" BASIS, >>>>>> WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >>>>>> implied. >>>>>> See the License for the specific language governing permissions and >>>>>> limitations under the License. >>>>>> --> >>>>>> >>>>>> <!-- >>>>>> Ignite configuration with all defaults and enabled p2p deployment >>>>>> and enabled events. >>>>>> --> >>>>>> <beans xmlns="http://www.springframework.org/schema/beans" >>>>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >>>>>> xmlns:util="http://www.springframework.org/schema/util" >>>>>> xsi:schemaLocation=" >>>>>> http://www.springframework.org/schema/beans >>>>>> http://www.springframework.org/schema/beans/spring-beans.xsd >>>>>> http://www.springframework.org/schema/util >>>>>> http://www.springframework.org/schema/util/spring-util.xsd"> >>>>>> <bean abstract="true" id="ignite.cfg" >>>>>> class="org.apache.ignite.configuration.IgniteConfiguration"> >>>>>> <!-- Set to true to enable distributed class loading for >>>>>> examples, default is false. --> >>>>>> <property name="peerClassLoadingEnabled" value="true"/> >>>>>> >>>>>> <!-- Enable task execution events for examples. --> >>>>>> <property name="includeEventTypes"> >>>>>> <list> >>>>>> <!--Task execution events--> >>>>>> <util:constant >>>>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/> >>>>>> <util:constant >>>>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/> >>>>>> <util:constant >>>>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/> >>>>>> <util:constant >>>>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/> >>>>>> <util:constant >>>>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/> >>>>>> <util:constant >>>>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/> >>>>>> >>>>>> <!--Cache events--> >>>>>> <util:constant >>>>>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> >>>>>> <util:constant >>>>>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> >>>>>> <util:constant >>>>>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> >>>>>> </list> >>>>>> </property> >>>>>> >>>>>> <!-- Explicitly configure TCP discovery SPI to provide list of >>>>>> initial nodes. --> >>>>>> <property name="discoverySpi"> >>>>>> <bean >>>>>> class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> >>>>>> <property name="ipFinder"> >>>>>> <!-- >>>>>> Ignite provides several options for automatic >>>>>> discovery that can be used >>>>>> instead os static IP based discovery. For >>>>>> information on all options refer >>>>>> to our documentation: >>>>>> http://apacheignite.readme.io/docs/cluster-config >>>>>> --> >>>>>> <!-- Uncomment static IP finder to enable >>>>>> static-based discovery of initial nodes. --> >>>>>> <!--<bean >>>>>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> >>>>>> <bean >>>>>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> >>>>>> <property name="addresses"> >>>>>> <list> >>>>>> <!-- In distributed environment, replace >>>>>> with actual host IP address. --> >>>>>> <value>3.88.248.113:10800</value> >>>>>> </list> >>>>>> </property> >>>>>> </bean> >>>>>> </property> >>>>>> </bean> >>>>>> </property> >>>>>> </bean> >>>>>> </beans> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Thanks & Regards >>>>>> Sri Tummala >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Thanks & Regards >>>>> Sri Tummala >>>>> >>>>> >>>> >>>> -- >>>> Thanks & Regards >>>> Sri Tummala >>>> >>>> >>> >>> -- >>> Thanks & Regards >>> Sri Tummala >>> >>> >>> >>> >> >> -- >> Thanks & Regards >> Sri Tummala >> >> > > -- > Thanks & Regards > Sri Tummala > > -- Thanks & Regards Sri Tummala
