Sri, That's wrong to assume that the Ignite thin client is the only way to access data stored in AWS or other cloud environments. The problems you are experiencing are caused by the fact that your application is running on the laptop while the cluster (server nodes) is within an AWS network.
If you use a thick/regular client, then servers running on AWS need to have a way to *open* network connections to the application. This won't happen if the laptop or dev environment doesn't have public IPs reachable from AWS. Plus, additional network configuration might be needed. Overall, thick clients work with AWS, but either an application needs to be deployed to AWS during development, or you need to start a local cluster in your network/premises first, accomplish development and move on with testing in AWS. A network configuration is also an option but a tricky one and might not be doable. Thin clients are a solution as well if key-value and SQL APIs are all you need. - Denis On Sun, Oct 20, 2019 at 2:06 PM sri hari kali charan Tummala < [email protected]> wrote: > so I got an answer from Grid Grain computing, JDBC thin client is the only > way yo connect the ignite cluster which is running on aws. > > > https://forums.gridgain.com/community-home/digestviewer/viewthread?MessageKey=13f5e836-1569-486a-8475-84c70fc141e0&CommunityKey=3b551477-7e2d-462f-bc5f-d6d10ccbbe35&tab=digestviewer&SuccessMsg=Thank+you+for+submitting+your+message. > > Thanks > Sri > > On Fri, Oct 18, 2019 at 1:38 PM sri hari kali charan Tummala < > [email protected]> wrote: > >> Ok I created a 2 node ec2 instance ignite cluster is below is the right >> way to create cache? the code still using my laptop resources unable to >> connect ignite cluster. >> >> Out put:- >> [13:36:36] Ignite node started OK (id=8535f4d3) >> [13:36:36] Topology snapshot [ver=30, locNode=8535f4d3, *servers=1,* >> clients=1, state=ACTIVE, CPUs=8, *offheap=6.4GB, heap=14.0GB*] >> >>> cache acquired >> >> package com.ignite.examples.igniteStartup >> >> import java.util.Arrays >> import java.util.List >> import com.ignite.examples.model.Address >> import org.apache.commons.logging.Log >> import org.apache.commons.logging.LogFactory >> import org.apache.ignite.Ignite >> import org.apache.ignite.IgniteCache >> import org.apache.ignite.Ignition >> import org.apache.ignite.configuration.IgniteConfiguration >> import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi >> import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder >> >> //remove if not needed >> import scala.collection.JavaConversions._ >> >> object IgniteStart2 { >> >> private var cache: IgniteCache[String, Address] = _ >> >> def main(args: Array[String]): Unit = { >> >> val spi: TcpDiscoverySpi = new TcpDiscoverySpi() >> val ipFinder: TcpDiscoveryVmIpFinder = new TcpDiscoveryVmIpFinder() >> val hostList: List[String] = >> Arrays.asList("127.0.0.1:47500..47509,18.206.247.40:47500..47509,52.207.217.31:47500..47509".split(","): >> _*) >> ipFinder.setAddresses(hostList) >> spi.setIpFinder(ipFinder) >> val cfg: IgniteConfiguration = new IgniteConfiguration() >> cfg.setDiscoverySpi(spi) >> cfg.setClientMode(true) >> System.out.println(">>> I am here") >> >> cfg.setPeerClassLoadingEnabled(true) >> val ignite: Ignite = Ignition.start(cfg) >> cache = Ignition.ignite().cache("test") >> System.out.println(">>> cache acquired") >> >> System.exit(0) >> >> >> } >> >> } >> >> >> On Fri, Oct 18, 2019 at 12:56 PM sri hari kali charan Tummala < >> [email protected]> wrote: >> >>> Hi Stephen , >>> >>> I followed below steps and created one node ec2 instance with Ignite on >>> it now I am connecting to the ignite cluster using spark in two ways 1) >>> thin client 2) I don't know you have to tell me >>> >>> my question is what is the value I have to pass for option 2 is it ec2 >>> instance public IP with port 47500..47509 in my example-default.xml file ? >>> >>> >>> https://www.gridgain.com/docs/8.7.6//installation-guide/manual-install-on-ec2 >>> >>> Thanks >>> Sri >>> >>> On Fri, Oct 18, 2019 at 12:47 PM Stephen Darlington < >>> [email protected]> wrote: >>> >>>> You’re still pointing your Spark node (*thick* client) at port 10800 >>>> (the *thin* client port). This is not going to work. >>>> >>>> You can create a table using sqlline and read it using Spark, or vice >>>> versa. But you need a functioning cluster. >>>> >>>> Check out the documentation on clustering concepts and configuration: >>>> https://www.gridgain.com/docs/latest/developers-guide/clustering/clustering >>>> >>>> >>>> On 18 Oct 2019, at 16:32, sri hari kali charan Tummala < >>>> [email protected]> wrote: >>>> >>>> Hi Stephen/All, >>>> >>>> got it working somewhat using below, but have an issue table and data >>>> which is created using thin client is failing to read using spark but table >>>> created by spark can be read using a thin client, that means table created >>>> in Ignite using spark are the only ones read using spark in Ignite? >>>> >>>> example-default.xml >>>> >>>> <property name="addresses"> >>>> <list> >>>> <!-- In distributed environment, replace with actual host IP >>>> address. --> >>>> <value>18.206.247.40:10800</value> >>>> </list> >>>> </property> >>>> >>>> >>>> NotWorking Scala Function:- >>>> >>>> def readThinClientTableUsingSpark(implicit spark: SparkSession) = { >>>> val personDataFrame = spark.read >>>> .format(FORMAT_IGNITE) >>>> .option(OPTION_CONFIG_FILE, CONFIG) >>>> .option(OPTION_TABLE, "person") >>>> .load() >>>> >>>> println() >>>> println("Data frame thin connection content:person") >>>> println() >>>> >>>> //Printing content of data frame to console. >>>> personDataFrame.show() >>>> } >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> *Exception in thread "main" class org.apache.ignite.IgniteException: >>>> Unknown table person at >>>> org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46) >>>> at >>>> org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46) >>>> at scala.Option.getOrElse(Option.scala:121) at >>>> org.apache.ignite.spark.impl.IgniteSQLRelation.schema(IgniteSQLRelation.scala:46) >>>> at >>>> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:431) >>>> at >>>> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239) >>>> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at >>>> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164) at >>>> com.ignite.examples.spark.SparkIgniteCleanCode$.readThinClientTableUsingSpark(SparkIgniteCleanCode.scala:154) >>>> at >>>> com.ignite.examples.spark.SparkIgniteCleanCode$.main(SparkIgniteCleanCode.scala:216) >>>> at >>>> com.ignite.examples.spark.SparkIgniteCleanCode.main(SparkIgniteCleanCode.scala)* >>>> >>>> *Full Code:- (step 7 Fails)* >>>> >>>> package com.ignite.examples.spark >>>> >>>> import com.ignite.examples.model.Address >>>> import org.apache.ignite.{Ignite, Ignition} >>>> import org.apache.ignite.cache.query.SqlFieldsQuery >>>> import org.apache.ignite.client.{ClientCache, IgniteClient} >>>> import org.apache.ignite.configuration.{CacheConfiguration, >>>> ClientConfiguration} >>>> import java.lang.{Long => JLong, String => JString} >>>> >>>> import com.ignite.examples.spark.SparkClientConnectionTest.{CACHE_NAME, >>>> CONFIG} >>>> import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath >>>> import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, >>>> OPTION_CONFIG_FILE, OPTION_CREATE_TABLE_PARAMETERS, >>>> OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, OPTION_TABLE} >>>> import org.apache.log4j.{Level, Logger} >>>> import org.apache.spark.sql.{SaveMode, SparkSession} >>>> >>>> object SparkIgniteCleanCode { >>>> >>>> private val CACHE_NAME = "SparkCache" >>>> >>>> private val CONFIG = >>>> "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml" >>>> >>>> def setupExampleData = { >>>> >>>> val cfg2 = new >>>> ClientConfiguration().setAddresses("18.206.247.40:10800") >>>> val igniteClient:IgniteClient = Ignition.startClient(cfg2) >>>> >>>> System.out.format(">>> Created cache [%s].\n", CACHE_NAME) >>>> >>>> val cache:ClientCache[Integer, Address] = >>>> igniteClient.getOrCreateCache(CACHE_NAME) >>>> >>>> cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS >>>> Person")) >>>> .setSchema("PUBLIC")).getAll >>>> >>>> cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT >>>> EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) >>>> WITH \"VALUE_TYPE=%s\"", classOf[Address].getName)) >>>> .setSchema("PUBLIC")).getAll >>>> >>>> cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) >>>> VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", >>>> "04074").setSchema("PUBLIC")).getAll >>>> cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) >>>> VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", >>>> "520003").setSchema("PUBLIC")).getAll >>>> cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) >>>> VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", >>>> "1234").setSchema("PUBLIC")).getAll >>>> >>>> System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME) >>>> >>>> val data=cache.query(new SqlFieldsQuery("select * from >>>> Person").setSchema("PUBLIC")).getAll >>>> >>>> println(data.toString) >>>> } >>>> >>>> def sparkReadIgniteWithThinClient(implicit spark: SparkSession)={ >>>> >>>> val df = spark.read >>>> .format("jdbc") >>>> .option("url", "jdbc:ignite:thin://18.206.247.40") >>>> .option("fetchsize",100) >>>> //.option("driver", "org.apache.ignite.IgniteJdbcDriver") >>>> .option("dbtable", "Person").load() >>>> >>>> df.printSchema() >>>> >>>> df.createOrReplaceTempView("test") >>>> >>>> spark.sql("select * from test where id=1").show(10) >>>> >>>> spark.sql("select 4,'blah',124232").show(10) >>>> >>>> } >>>> >>>> def sparkWriteIgniteWithThinClient(implicit spark: SparkSession)={ >>>> >>>> import java.sql.DriverManager >>>> val connection = >>>> DriverManager.getConnection("jdbc:ignite:thin://18.206.247.40") >>>> >>>> import java.util.Properties >>>> val connectionProperties = new Properties() >>>> >>>> connectionProperties.put("url", "jdbc:ignite:thin://18.206.247.40") >>>> >>>> spark.sql("select 4 as ID,'blah' as STREET,124232 as >>>> ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://18.206.247.40", >>>> "Person",connectionProperties) >>>> >>>> spark.read >>>> .format("jdbc") >>>> .option("url", "jdbc:ignite:thin://18.206.247.40") >>>> .option("fetchsize",100) >>>> .option("dbtable", "Person").load().show(10,false) >>>> >>>> } >>>> >>>> def writeJSonToIgniteUsingSpark(implicit spark: SparkSession): Unit = { >>>> >>>> val ignite = Ignition.start(CONFIG) >>>> >>>> //Load content of json file to data frame. >>>> val personsDataFrame = spark.read.json( >>>> >>>> resolveIgnitePath("/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/resources/person.json").getAbsolutePath) >>>> >>>> println() >>>> println("Json file content:") >>>> println() >>>> >>>> //Printing content of json file to console. >>>> personsDataFrame.show() >>>> >>>> println() >>>> println("Writing Data Frame to Ignite:") >>>> println() >>>> >>>> //Writing content of data frame to Ignite. >>>> personsDataFrame.write >>>> .format(FORMAT_IGNITE) >>>> .mode(org.apache.spark.sql.SaveMode.Append) >>>> .option(OPTION_CONFIG_FILE, CONFIG) >>>> .option(OPTION_TABLE, "json_person") >>>> .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id") >>>> .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated") >>>> .save() >>>> >>>> println("Done!") >>>> >>>> println() >>>> println("Reading data from Ignite table:") >>>> println() >>>> >>>> val cache = ignite.cache[Any, Any](CACHE_NAME) >>>> >>>> //Reading saved data from Ignite. >>>> val data = cache.query(new SqlFieldsQuery("SELECT id, name, department >>>> FROM json_person")).getAll >>>> >>>> println(data.toString) >>>> >>>> //data.foreach { row ⇒ println(row.mkString("[", ", ", "]")) } >>>> } >>>> >>>> def readIgniteUsingSpark(implicit spark: SparkSession) = { >>>> val json_person = spark.read >>>> .format(FORMAT_IGNITE) >>>> .option(OPTION_CONFIG_FILE, CONFIG) >>>> .option(OPTION_TABLE, "json_person") >>>> .load() >>>> >>>> println() >>>> println("Data frame content:json_person") >>>> println() >>>> >>>> //Printing content of data frame to console. >>>> json_person.show() >>>> } >>>> >>>> >>>> def readThinClientTableUsingSpark(implicit spark: SparkSession) = { >>>> val personDataFrame = spark.read >>>> .format(FORMAT_IGNITE) >>>> .option(OPTION_CONFIG_FILE, CONFIG) >>>> .option(OPTION_TABLE, "person") >>>> .load() >>>> >>>> println() >>>> println("Data frame thin connection content:person") >>>> println() >>>> >>>> //Printing content of data frame to console. >>>> personDataFrame.show() >>>> } >>>> >>>> >>>> def main(args: Array[String]): Unit = { >>>> >>>> println() >>>> println("Step 1 setupExampleData:") >>>> println() >>>> >>>> setupExampleData >>>> >>>> println() >>>> println("Step 2 createSparkSession:") >>>> println() >>>> >>>> //Creating spark session. >>>> implicit val spark = SparkSession.builder() >>>> .appName("Spark Ignite data sources example") >>>> .master("local") >>>> .config("spark.executor.instances", "2") >>>> .getOrCreate() >>>> >>>> // Adjust the logger to exclude the logs of no interest. >>>> Logger.getRootLogger.setLevel(Level.ERROR) >>>> Logger.getLogger("org.apache.ignite").setLevel(Level.INFO) >>>> >>>> println() >>>> println("Step 3 ReadIgniteWithThinClient of Step1 Data:") >>>> println() >>>> >>>> sparkReadIgniteWithThinClient(spark) >>>> >>>> println() >>>> println("Step 4 sparkWriteIgniteWithThinClient of Step1 Data:") >>>> println() >>>> >>>> sparkWriteIgniteWithThinClient(spark) >>>> >>>> println() >>>> println("Step 5 writeJSonToIgniteUsingSpark Using Spark:") >>>> println() >>>> >>>> writeJSonToIgniteUsingSpark(spark) >>>> >>>> println() >>>> println("Step 6 readIgniteUsingSpark Using Spark:") >>>> println() >>>> >>>> readIgniteUsingSpark(spark) >>>> >>>> println() >>>> println("Step 7 readThinClientTableUsingSpark Using Spark:") >>>> println() >>>> >>>> readThinClientTableUsingSpark(spark) >>>> >>>> >>>> spark.stop() >>>> >>>> >>>> } >>>> >>>> } >>>> >>>> >>>> example-default.xml >>>> >>>> <?xml version="1.0" encoding="UTF-8"?> >>>> >>>> <!-- >>>> Licensed to the Apache Software Foundation (ASF) under one or more >>>> contributor license agreements. See the NOTICE file distributed with >>>> this work for additional information regarding copyright ownership. >>>> The ASF licenses this file to You under the Apache License, Version 2.0 >>>> (the "License"); you may not use this file except in compliance with >>>> the License. You may obtain a copy of the License at >>>> >>>> http://www.apache.org/licenses/LICENSE-2.0 >>>> >>>> Unless required by applicable law or agreed to in writing, software >>>> distributed under the License is distributed on an "AS IS" BASIS, >>>> WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >>>> See the License for the specific language governing permissions and >>>> limitations under the License. >>>> --> >>>> >>>> <!-- >>>> Ignite configuration with all defaults and enabled p2p deployment and >>>> enabled events. >>>> --> >>>> <beans xmlns="http://www.springframework.org/schema/beans" >>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >>>> xmlns:util="http://www.springframework.org/schema/util" >>>> xsi:schemaLocation=" >>>> http://www.springframework.org/schema/beans >>>> http://www.springframework.org/schema/beans/spring-beans.xsd >>>> http://www.springframework.org/schema/util >>>> http://www.springframework.org/schema/util/spring-util.xsd"> >>>> <bean abstract="true" id="ignite.cfg" >>>> class="org.apache.ignite.configuration.IgniteConfiguration"> >>>> <!-- Set to true to enable distributed class loading for examples, >>>> default is false. --> >>>> <property name="peerClassLoadingEnabled" value="true"/> >>>> >>>> <!-- Enable task execution events for examples. --> >>>> <property name="includeEventTypes"> >>>> <list> >>>> <!--Task execution events--> >>>> <util:constant >>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/> >>>> <util:constant >>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/> >>>> <util:constant >>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/> >>>> <util:constant >>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/> >>>> <util:constant >>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/> >>>> <util:constant >>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/> >>>> >>>> <!--Cache events--> >>>> <util:constant >>>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> >>>> <util:constant >>>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> >>>> <util:constant >>>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> >>>> </list> >>>> </property> >>>> >>>> <!-- Explicitly configure TCP discovery SPI to provide list of >>>> initial nodes. --> >>>> <property name="discoverySpi"> >>>> <bean >>>> class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> >>>> <property name="ipFinder"> >>>> <!-- >>>> Ignite provides several options for automatic >>>> discovery that can be used >>>> instead os static IP based discovery. For >>>> information on all options refer >>>> to our documentation: >>>> http://apacheignite.readme.io/docs/cluster-config >>>> --> >>>> <!-- Uncomment static IP finder to enable static-based >>>> discovery of initial nodes. --> >>>> <!--<bean >>>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> >>>> <bean >>>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> >>>> <property name="addresses"> >>>> <list> >>>> <!-- In distributed environment, replace >>>> with actual host IP address. --> >>>> <value>18.206.247.40:10800</value> >>>> </list> >>>> </property> >>>> </bean> >>>> </property> >>>> </bean> >>>> </property> >>>> </bean> >>>> </beans> >>>> >>>> >>>> Thanks >>>> Sri >>>> >>>> On Fri, Oct 18, 2019 at 9:23 AM sri hari kali charan Tummala < >>>> [email protected]> wrote: >>>> >>>>> do you mean communication ports 47100-47200 as mentioned ( >>>>> https://www.gridgain.com/docs/8.7.6//installation-guide/manual-install-on-ec2) >>>>> ? bare in mind I am running my spark job outside ignite ec2 box (MAC PC). >>>>> >>>>> which option is right in my default.xml file? >>>>> >>>>> Option 1:- >>>>> >>>>> <property name="addresses"> >>>>> <list> >>>>> <!-- In distributed environment, replace >>>>> with actual host IP address. --> >>>>> <value>3.88.248.113:4 >>>>> <http://3.88.248.113:10800/>7100..47200</value> >>>>> </list> >>>>> </property> >>>>> >>>>> >>>>> Option 2:- >>>>> >>>>> <property name="addresses"> >>>>> <list> >>>>> <!-- In distributed environment, replace >>>>> with actual host IP address. --> >>>>> <value>3.88.248.113:4 >>>>> <http://3.88.248.113:10800/>7500..47600</value> >>>>> </list> >>>>> </property> >>>>> >>>>> >>>>> Option 3:- >>>>> >>>>> <property name="addresses"> >>>>> <list> >>>>> <!-- In distributed environment, replace >>>>> with actual host IP address. --> >>>>> <value>3.88.248.113 >>>>> <http://3.88.248.113:10800/></value> >>>>> </list> >>>>> </property> >>>>> >>>>> >>>>> >>>>> Thanks >>>>> >>>>> On Fri, Oct 18, 2019 at 9:18 AM sri hari kali charan Tummala < >>>>> [email protected]> wrote: >>>>> >>>>>> Hi Stephen , >>>>>> >>>>>> do you mean 3.88.248.113: <http://3.88.248.113:10800/>47500..47700 >>>>>> something like this? or just public ip 3.88.248.113 >>>>>> <http://3.88.248.113:10800/> I tried all the possibilities none of >>>>>> them are getting connected. >>>>>> >>>>>> Thanks >>>>>> Sri >>>>>> >>>>>> On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> You’re trying to connect a thick client (the Spark integration) to >>>>>>> the thin client port (10800). Your example-default.xml file needs to >>>>>>> have >>>>>>> the same configuration as your server node(s). >>>>>>> >>>>>>> Regards, >>>>>>> Stephen >>>>>>> >>>>>>> On 17 Oct 2019, at 18:12, sri hari kali charan Tummala < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>> Hi Community, >>>>>>> >>>>>>> I am trying to read and write into the Ignite cluster using >>>>>>> apache-spark I am able to do that using JDBC thin client but not native >>>>>>> method as mentioned in several spark + ignite examples. >>>>>>> >>>>>>> Right now all the spark + ignite examples launch a local ignite >>>>>>> cluster but I want my code connecting to already existing cluster >>>>>>> (client). >>>>>>> >>>>>>> Question:- >>>>>>> *How to pass Ignite connection ip and port (10800) 10800 in >>>>>>> example-default.xml ?* >>>>>>> >>>>>>> Error:- >>>>>>> *TcpDiscoverySpi: Failed to connect to any address from IP finder >>>>>>> (will retry to join topology every 2000 ms; change 'reconnectDelay' to >>>>>>> configure the frequency of retries): [/3.88.248.113:10800 >>>>>>> <http://3.88.248.113:10800/>]* >>>>>>> >>>>>>> Working (Spark + Ignite using JDBC):- >>>>>>> >>>>>>> val df = spark.read >>>>>>> .format("jdbc") >>>>>>> .option("url", "jdbc:ignite:thin://3.88.248.113") >>>>>>> .option("fetchsize",100) >>>>>>> //.option("driver", "org.apache.ignite.IgniteJdbcDriver") >>>>>>> .option("dbtable", "Person").load() >>>>>>> >>>>>>> df.printSchema() >>>>>>> >>>>>>> df.createOrReplaceTempView("test") >>>>>>> >>>>>>> spark.sql("select * from test where id=1").show(10) >>>>>>> >>>>>>> spark.sql("select 4,'blah',124232").show(10) >>>>>>> >>>>>>> import java.sql.DriverManager >>>>>>> val connection = >>>>>>> DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113") >>>>>>> >>>>>>> import java.util.Properties >>>>>>> val connectionProperties = new Properties() >>>>>>> >>>>>>> connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113") >>>>>>> >>>>>>> spark.sql("select 4 as ID,'blah' as STREET,124232 as >>>>>>> ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", >>>>>>> "Person",connectionProperties) >>>>>>> >>>>>>> spark.read >>>>>>> .format("jdbc") >>>>>>> .option("url", "jdbc:ignite:thin://3.88.248.113") >>>>>>> .option("fetchsize",100) >>>>>>> .option("dbtable", "Person").load().show(10,false) >>>>>>> >>>>>>> >>>>>>> Not Working requires a CONFIG file which is example-default.xml:- >>>>>>> >>>>>>> val igniteDF = spark.read >>>>>>> .format(FORMAT_IGNITE) //Data source type. >>>>>>> .option(OPTION_TABLE, "person") //Table to read. >>>>>>> .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. >>>>>>> .load() >>>>>>> .filter(col("id") >= 2) //Filter clause. >>>>>>> .filter(col("name") like "%J%") //Another filter clause. >>>>>>> >>>>>>> >>>>>>> Full Code:- (sparkDSLExample) function fails to connect ignite >>>>>>> cluster which I already have >>>>>>> >>>>>>> package com.ignite.examples.spark >>>>>>> >>>>>>> import com.ignite.examples.model.Address >>>>>>> import org.apache.ignite.{Ignite, Ignition} >>>>>>> import org.apache.ignite.cache.query.SqlFieldsQuery >>>>>>> import org.apache.ignite.client.{ClientCache, IgniteClient} >>>>>>> import org.apache.ignite.configuration.{CacheConfiguration, >>>>>>> ClientConfiguration} >>>>>>> import java.lang.{Long => JLong, String => JString} >>>>>>> >>>>>>> import org.apache.ignite.cache.query.SqlFieldsQuery >>>>>>> import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, >>>>>>> OPTION_CONFIG_FILE, OPTION_TABLE} >>>>>>> import org.apache.log4j.{Level, Logger} >>>>>>> import org.apache.spark.sql.{SaveMode, SparkSession} >>>>>>> import org.apache.spark.sql.functions.col >>>>>>> >>>>>>> object SparkClientConnectionTest { >>>>>>> >>>>>>> private val CACHE_NAME = "SparkCache" >>>>>>> >>>>>>> private val CONFIG = >>>>>>> "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml" >>>>>>> >>>>>>> def setupExampleData = { >>>>>>> >>>>>>> val cfg2 = new >>>>>>> ClientConfiguration().setAddresses("3.88.248.113:10800") >>>>>>> val igniteClient:IgniteClient = Ignition.startClient(cfg2) >>>>>>> >>>>>>> System.out.format(">>> Created cache [%s].\n", CACHE_NAME) >>>>>>> >>>>>>> val cache:ClientCache[Integer, Address] = >>>>>>> igniteClient.getOrCreateCache(CACHE_NAME) >>>>>>> >>>>>>> cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS >>>>>>> Person")) >>>>>>> .setSchema("PUBLIC")).getAll >>>>>>> >>>>>>> cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT >>>>>>> EXISTS Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) >>>>>>> WITH \"VALUE_TYPE=%s\"", classOf[Address].getName)) >>>>>>> .setSchema("PUBLIC")).getAll >>>>>>> >>>>>>> cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) >>>>>>> VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", >>>>>>> "04074").setSchema("PUBLIC")).getAll >>>>>>> cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) >>>>>>> VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", >>>>>>> "520003").setSchema("PUBLIC")).getAll >>>>>>> cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) >>>>>>> VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", >>>>>>> "1234").setSchema("PUBLIC")).getAll >>>>>>> >>>>>>> System.out.format(">>> Data Inserted into Cache [%s].\n", >>>>>>> CACHE_NAME) >>>>>>> >>>>>>> val data=cache.query(new SqlFieldsQuery("select * from >>>>>>> Person").setSchema("PUBLIC")).getAll >>>>>>> >>>>>>> println(data.toString) >>>>>>> } >>>>>>> >>>>>>> def sparkDSLExample(implicit spark: SparkSession): Unit = { >>>>>>> println("Querying using Spark DSL.") >>>>>>> println >>>>>>> >>>>>>> >>>>>>> val igniteDF = spark.read >>>>>>> .format(FORMAT_IGNITE) //Data source type. >>>>>>> .option(OPTION_TABLE, "person") //Table to read. >>>>>>> .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. >>>>>>> .load() >>>>>>> .filter(col("id") >= 2) //Filter clause. >>>>>>> .filter(col("name") like "%J%") //Another filter clause. >>>>>>> >>>>>>> println("Data frame schema:") >>>>>>> >>>>>>> igniteDF.printSchema() //Printing query schema to console. >>>>>>> >>>>>>> println("Data frame content:") >>>>>>> >>>>>>> igniteDF.show() //Printing query results to console. >>>>>>> } >>>>>>> >>>>>>> >>>>>>> def main(args: Array[String]): Unit = { >>>>>>> >>>>>>> setupExampleData >>>>>>> >>>>>>> //Creating spark session. >>>>>>> implicit val spark = SparkSession.builder() >>>>>>> .appName("Spark Ignite data sources example") >>>>>>> .master("local") >>>>>>> .config("spark.executor.instances", "2") >>>>>>> .getOrCreate() >>>>>>> >>>>>>> // Adjust the logger to exclude the logs of no interest. >>>>>>> Logger.getRootLogger.setLevel(Level.ERROR) >>>>>>> Logger.getLogger("org.apache.ignite").setLevel(Level.INFO) >>>>>>> >>>>>>> //sparkDSLExample >>>>>>> >>>>>>> >>>>>>> val df = spark.read >>>>>>> .format("jdbc") >>>>>>> .option("url", "jdbc:ignite:thin://3.88.248.113") >>>>>>> .option("fetchsize",100) >>>>>>> //.option("driver", "org.apache.ignite.IgniteJdbcDriver") >>>>>>> .option("dbtable", "Person").load() >>>>>>> >>>>>>> df.printSchema() >>>>>>> >>>>>>> df.createOrReplaceTempView("test") >>>>>>> >>>>>>> spark.sql("select * from test where id=1").show(10) >>>>>>> >>>>>>> spark.sql("select 4,'blah',124232").show(10) >>>>>>> >>>>>>> import java.sql.DriverManager >>>>>>> val connection = >>>>>>> DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113") >>>>>>> >>>>>>> import java.util.Properties >>>>>>> val connectionProperties = new Properties() >>>>>>> >>>>>>> connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113") >>>>>>> >>>>>>> spark.sql("select 4 as ID,'blah' as STREET,124232 as >>>>>>> ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", >>>>>>> "Person",connectionProperties) >>>>>>> >>>>>>> spark.read >>>>>>> .format("jdbc") >>>>>>> .option("url", "jdbc:ignite:thin://3.88.248.113") >>>>>>> .option("fetchsize",100) >>>>>>> .option("dbtable", "Person").load().show(10,false) >>>>>>> >>>>>>> } >>>>>>> >>>>>>> } >>>>>>> >>>>>>> >>>>>>> example-default.xml:- >>>>>>> >>>>>>> <?xml version="1.0" encoding="UTF-8"?> >>>>>>> >>>>>>> <!-- >>>>>>> Licensed to the Apache Software Foundation (ASF) under one or more >>>>>>> contributor license agreements. See the NOTICE file distributed with >>>>>>> this work for additional information regarding copyright ownership. >>>>>>> The ASF licenses this file to You under the Apache License, Version >>>>>>> 2.0 >>>>>>> (the "License"); you may not use this file except in compliance with >>>>>>> the License. You may obtain a copy of the License at >>>>>>> >>>>>>> http://www.apache.org/licenses/LICENSE-2.0 >>>>>>> >>>>>>> Unless required by applicable law or agreed to in writing, software >>>>>>> distributed under the License is distributed on an "AS IS" BASIS, >>>>>>> WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >>>>>>> implied. >>>>>>> See the License for the specific language governing permissions and >>>>>>> limitations under the License. >>>>>>> --> >>>>>>> >>>>>>> <!-- >>>>>>> Ignite configuration with all defaults and enabled p2p deployment >>>>>>> and enabled events. >>>>>>> --> >>>>>>> <beans xmlns="http://www.springframework.org/schema/beans" >>>>>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >>>>>>> xmlns:util="http://www.springframework.org/schema/util" >>>>>>> xsi:schemaLocation=" >>>>>>> http://www.springframework.org/schema/beans >>>>>>> http://www.springframework.org/schema/beans/spring-beans.xsd >>>>>>> http://www.springframework.org/schema/util >>>>>>> http://www.springframework.org/schema/util/spring-util.xsd"> >>>>>>> <bean abstract="true" id="ignite.cfg" >>>>>>> class="org.apache.ignite.configuration.IgniteConfiguration"> >>>>>>> <!-- Set to true to enable distributed class loading for >>>>>>> examples, default is false. --> >>>>>>> <property name="peerClassLoadingEnabled" value="true"/> >>>>>>> >>>>>>> <!-- Enable task execution events for examples. --> >>>>>>> <property name="includeEventTypes"> >>>>>>> <list> >>>>>>> <!--Task execution events--> >>>>>>> <util:constant >>>>>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/> >>>>>>> <util:constant >>>>>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/> >>>>>>> <util:constant >>>>>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/> >>>>>>> <util:constant >>>>>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/> >>>>>>> <util:constant >>>>>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/> >>>>>>> <util:constant >>>>>>> static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/> >>>>>>> >>>>>>> <!--Cache events--> >>>>>>> <util:constant >>>>>>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> >>>>>>> <util:constant >>>>>>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> >>>>>>> <util:constant >>>>>>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> >>>>>>> </list> >>>>>>> </property> >>>>>>> >>>>>>> <!-- Explicitly configure TCP discovery SPI to provide list of >>>>>>> initial nodes. --> >>>>>>> <property name="discoverySpi"> >>>>>>> <bean >>>>>>> class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> >>>>>>> <property name="ipFinder"> >>>>>>> <!-- >>>>>>> Ignite provides several options for automatic >>>>>>> discovery that can be used >>>>>>> instead os static IP based discovery. For >>>>>>> information on all options refer >>>>>>> to our documentation: >>>>>>> http://apacheignite.readme.io/docs/cluster-config >>>>>>> --> >>>>>>> <!-- Uncomment static IP finder to enable >>>>>>> static-based discovery of initial nodes. --> >>>>>>> <!--<bean >>>>>>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> >>>>>>> <bean >>>>>>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> >>>>>>> <property name="addresses"> >>>>>>> <list> >>>>>>> <!-- In distributed environment, >>>>>>> replace with actual host IP address. --> >>>>>>> <value>3.88.248.113:10800</value> >>>>>>> </list> >>>>>>> </property> >>>>>>> </bean> >>>>>>> </property> >>>>>>> </bean> >>>>>>> </property> >>>>>>> </bean> >>>>>>> </beans> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Thanks & Regards >>>>>>> Sri Tummala >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Thanks & Regards >>>>>> Sri Tummala >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Thanks & Regards >>>>> Sri Tummala >>>>> >>>>> >>>> >>>> -- >>>> Thanks & Regards >>>> Sri Tummala >>>> >>>> >>>> >>>> >>> >>> -- >>> Thanks & Regards >>> Sri Tummala >>> >>> >> >> -- >> Thanks & Regards >> Sri Tummala >> >> > > -- > Thanks & Regards > Sri Tummala > >
