I got it so I have to build a jar move to aws ignite cluster node and run
it on the node.

Thanks
Sri

On Thursday, October 24, 2019, Denis Magda <[email protected]> wrote:

> Sri,
>
> That's wrong to assume that the Ignite thin client is the only way to
> access data stored in AWS or other cloud environments. The problems you are
> experiencing are caused by the fact that your application is running on the
> laptop while the cluster (server nodes) is within an AWS network.
>
> If you use a thick/regular client, then servers running on AWS need to
> have a way to *open* network connections to the application. This won't
> happen if the laptop or dev environment doesn't have public IPs reachable
> from AWS. Plus, additional network configuration might be needed.
>
> Overall, thick clients work with AWS, but either an application needs to
> be deployed to AWS during development, or you need to start a local cluster
> in your network/premises first, accomplish development and move on with
> testing in AWS. A network configuration is also an option but a tricky one
> and might not be doable.
>
> Thin clients are a solution as well if key-value and SQL APIs are all you
> need.
>
> -
> Denis
>
>
> On Sun, Oct 20, 2019 at 2:06 PM sri hari kali charan Tummala <
> [email protected]> wrote:
>
> so I got an answer from Grid Grain computing, JDBC thin client is the only
> way yo connect the ignite cluster which is running on aws.
>
> https://forums.gridgain.com/community-home/digestviewer/
> viewthread?MessageKey=13f5e836-1569-486a-8475-84c70fc141e0&CommunityKey=
> 3b551477-7e2d-462f-bc5f-d6d10ccbbe35&tab=digestviewer&
> SuccessMsg=Thank+you+for+submitting+your+message.
>
> Thanks
> Sri
>
> On Fri, Oct 18, 2019 at 1:38 PM sri hari kali charan Tummala <
> [email protected]> wrote:
>
> Ok I created a 2 node ec2 instance ignite cluster is below is the right
> way to create cache? the code still using my laptop resources unable to
> connect ignite cluster.
>
> Out put:-
> [13:36:36] Ignite node started OK (id=8535f4d3)
> [13:36:36] Topology snapshot [ver=30, locNode=8535f4d3, *servers=1,*
> clients=1, state=ACTIVE, CPUs=8, *offheap=6.4GB, heap=14.0GB*]
> >>> cache acquired
>
> package com.ignite.examples.igniteStartup
>
> import java.util.Arrays
> import java.util.List
> import com.ignite.examples.model.Address
> import org.apache.commons.logging.Log
> import org.apache.commons.logging.LogFactory
> import org.apache.ignite.Ignite
> import org.apache.ignite.IgniteCache
> import org.apache.ignite.Ignition
> import org.apache.ignite.configuration.IgniteConfiguration
> import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi
> import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder
>
> //remove if not needed
> import scala.collection.JavaConversions._
>
> object IgniteStart2 {
>
>   private var cache: IgniteCache[String, Address] = _
>
>   def main(args: Array[String]): Unit = {
>
>     val spi: TcpDiscoverySpi = new TcpDiscoverySpi()
>     val ipFinder: TcpDiscoveryVmIpFinder = new TcpDiscoveryVmIpFinder()
>     val hostList: List[String] = 
> Arrays.asList("127.0.0.1:47500..47509,18.206.247.40:47500..47509,52.207.217.31:47500..47509".split(","):
>  _*)
>     ipFinder.setAddresses(hostList)
>     spi.setIpFinder(ipFinder)
>     val cfg: IgniteConfiguration = new IgniteConfiguration()
>     cfg.setDiscoverySpi(spi)
>     cfg.setClientMode(true)
>     System.out.println(">>> I am here")
>
>     cfg.setPeerClassLoadingEnabled(true)
>     val ignite: Ignite = Ignition.start(cfg)
>     cache = Ignition.ignite().cache("test")
>     System.out.println(">>> cache acquired")
>
>     System.exit(0)
>
>
>   }
>
> }
>
>
> On Fri, Oct 18, 2019 at 12:56 PM sri hari kali charan Tummala <
> [email protected]> wrote:
>
> Hi Stephen ,
>
> I followed below steps and created one node ec2 instance with Ignite on it
> now I am connecting to the ignite cluster using spark in two ways 1) thin
> client 2) I don't know you have to tell me
>
> my question is what is the value I have to pass for option 2 is it ec2
> instance public IP with port 47500..47509  in my example-default.xml file ?
>
> https://www.gridgain.com/docs/8.7.6//installation-guide/
> manual-install-on-ec2
>
> Thanks
> Sri
>
> On Fri, Oct 18, 2019 at 12:47 PM Stephen Darlington <
> [email protected]> wrote:
>
> You’re still pointing your Spark node (*thick* client) at port 10800 (the
> *thin* client port). This is not going to work.
>
> You can create a table using sqlline and read it using Spark, or vice
> versa. But you need a functioning cluster.
>
> Check out the documentation on clustering concepts and configuration:
> https://www.gridgain.com/docs/latest/developers-guide/clustering/
> clustering
>
> On 18 Oct 2019, at 16:32, sri hari kali charan Tummala <
> [email protected]> wrote:
>
> Hi Stephen/All,
>
> got it working somewhat using below, but have an issue table and data
> which is created using thin client is failing to read using spark but table
> created by spark can be read using a thin client, that means table created
> in Ignite using spark are the only ones read using spark in Ignite?
>
> example-default.xml
>
> <property name="addresses">
>     <list>
>         <!-- In distributed environment, replace with actual host IP address. 
> -->
>         <value>18.206.247.40:10800</value>
>     </list>
> </property>
>
>
> NotWorking Scala Function:-
>
> def readThinClientTableUsingSpark(implicit spark: SparkSession) = {
>   val personDataFrame = spark.read
>     .format(FORMAT_IGNITE)
>     .option(OPTION_CONFIG_FILE, CONFIG)
>     .option(OPTION_TABLE, "person")
>     .load()
>
>   println()
>   println("Data frame thin connection content:person")
>   println()
>
>   //Printing content of data frame to console.
>   personDataFrame.show()
> }
>
>
>
>
>
>
>
>
>
>
>
>
>
> *Exception in thread "main" class org.apache.ignite.IgniteException:
> Unknown table person at
> org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46)
> at
> org.apache.ignite.spark.impl.IgniteSQLRelation$$anonfun$schema$2.apply(IgniteSQLRelation.scala:46)
> at scala.Option.getOrElse(Option.scala:121) at
> org.apache.ignite.spark.impl.IgniteSQLRelation.schema(IgniteSQLRelation.scala:46)
> at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:431)
> at
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164) at
> com.ignite.examples.spark.SparkIgniteCleanCode$.readThinClientTableUsingSpark(SparkIgniteCleanCode.scala:154)
> at
> com.ignite.examples.spark.SparkIgniteCleanCode$.main(SparkIgniteCleanCode.scala:216)
> at
> com.ignite.examples.spark.SparkIgniteCleanCode.main(SparkIgniteCleanCode.scala)*
>
> *Full Code:- (step 7 Fails)*
>
> package com.ignite.examples.spark
>
> import com.ignite.examples.model.Address
> import org.apache.ignite.{Ignite, Ignition}
> import org.apache.ignite.cache.query.SqlFieldsQuery
> import org.apache.ignite.client.{ClientCache, IgniteClient}
> import org.apache.ignite.configuration.{CacheConfiguration, 
> ClientConfiguration}
> import java.lang.{Long => JLong, String => JString}
>
> import com.ignite.examples.spark.SparkClientConnectionTest.{CACHE_NAME, 
> CONFIG}
> import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath
> import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, 
> OPTION_CONFIG_FILE, OPTION_CREATE_TABLE_PARAMETERS, 
> OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, OPTION_TABLE}
> import org.apache.log4j.{Level, Logger}
> import org.apache.spark.sql.{SaveMode, SparkSession}
>
> object SparkIgniteCleanCode {
>
>   private val CACHE_NAME = "SparkCache"
>
>   private val CONFIG = 
> "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
>
>   def setupExampleData = {
>
>     val cfg2 = new ClientConfiguration().setAddresses("18.206.247.40:10800")
>     val igniteClient:IgniteClient = Ignition.startClient(cfg2)
>
>     System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
>
>     val cache:ClientCache[Integer, Address] = 
> igniteClient.getOrCreateCache(CACHE_NAME)
>
>     cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS 
> Person"))
>       .setSchema("PUBLIC")).getAll
>
>     cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS 
> Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH 
> \"VALUE_TYPE=%s\"", classOf[Address].getName))
>       .setSchema("PUBLIC")).getAll
>
>     cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) 
> VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", 
> "04074").setSchema("PUBLIC")).getAll
>     cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) 
> VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", 
> "520003").setSchema("PUBLIC")).getAll
>     cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) 
> VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", 
> "1234").setSchema("PUBLIC")).getAll
>
>     System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
>
>     val data=cache.query(new SqlFieldsQuery("select * from 
> Person").setSchema("PUBLIC")).getAll
>
>     println(data.toString)
>   }
>
>   def sparkReadIgniteWithThinClient(implicit spark: SparkSession)={
>
>     val df = spark.read
>       .format("jdbc")
>       .option("url", "jdbc:ignite:thin://18.206.247.40")
>       .option("fetchsize",100)
>       //.option("driver", "org.apache.ignite.IgniteJdbcDriver")
>       .option("dbtable", "Person").load()
>
>     df.printSchema()
>
>     df.createOrReplaceTempView("test")
>
>     spark.sql("select * from test where id=1").show(10)
>
>     spark.sql("select 4,'blah',124232").show(10)
>
>   }
>
>   def sparkWriteIgniteWithThinClient(implicit spark: SparkSession)={
>
>     import java.sql.DriverManager
>     val connection = 
> DriverManager.getConnection("jdbc:ignite:thin://18.206.247.40")
>
>     import java.util.Properties
>     val connectionProperties = new Properties()
>
>     connectionProperties.put("url", "jdbc:ignite:thin://18.206.247.40")
>
>     spark.sql("select 4 as ID,'blah' as STREET,124232 as 
> ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://18.206.247.40",
>       "Person",connectionProperties)
>
>     spark.read
>       .format("jdbc")
>       .option("url", "jdbc:ignite:thin://18.206.247.40")
>       .option("fetchsize",100)
>       .option("dbtable", "Person").load().show(10,false)
>
>   }
>
>   def writeJSonToIgniteUsingSpark(implicit spark: SparkSession): Unit = {
>
>     val ignite = Ignition.start(CONFIG)
>
>     //Load content of json file to data frame.
>     val personsDataFrame = spark.read.json(
>       
> resolveIgnitePath("/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/resources/person.json").getAbsolutePath)
>
>     println()
>     println("Json file content:")
>     println()
>
>     //Printing content of json file to console.
>     personsDataFrame.show()
>
>     println()
>     println("Writing Data Frame to Ignite:")
>     println()
>
>     //Writing content of data frame to Ignite.
>     personsDataFrame.write
>       .format(FORMAT_IGNITE)
>       .mode(org.apache.spark.sql.SaveMode.Append)
>       .option(OPTION_CONFIG_FILE, CONFIG)
>       .option(OPTION_TABLE, "json_person")
>       .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
>       .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated")
>       .save()
>
>     println("Done!")
>
>     println()
>     println("Reading data from Ignite table:")
>     println()
>
>     val cache = ignite.cache[Any, Any](CACHE_NAME)
>
>     //Reading saved data from Ignite.
>     val data = cache.query(new SqlFieldsQuery("SELECT id, name, department 
> FROM json_person")).getAll
>
>     println(data.toString)
>
>     //data.foreach { row ⇒ println(row.mkString("[", ", ", "]")) }
>   }
>
>   def readIgniteUsingSpark(implicit spark: SparkSession) = {
>     val json_person = spark.read
>       .format(FORMAT_IGNITE)
>       .option(OPTION_CONFIG_FILE, CONFIG)
>       .option(OPTION_TABLE, "json_person")
>       .load()
>
>     println()
>     println("Data frame content:json_person")
>     println()
>
>     //Printing content of data frame to console.
>     json_person.show()
>   }
>
>
>   def readThinClientTableUsingSpark(implicit spark: SparkSession) = {
>     val personDataFrame = spark.read
>       .format(FORMAT_IGNITE)
>       .option(OPTION_CONFIG_FILE, CONFIG)
>       .option(OPTION_TABLE, "person")
>       .load()
>
>     println()
>     println("Data frame thin connection content:person")
>     println()
>
>     //Printing content of data frame to console.
>     personDataFrame.show()
>   }
>
>
>   def main(args: Array[String]): Unit = {
>
>     println()
>     println("Step 1 setupExampleData:")
>     println()
>
>     setupExampleData
>
>     println()
>     println("Step 2 createSparkSession:")
>     println()
>
>     //Creating spark session.
>     implicit val spark = SparkSession.builder()
>       .appName("Spark Ignite data sources example")
>       .master("local")
>       .config("spark.executor.instances", "2")
>       .getOrCreate()
>
>     // Adjust the logger to exclude the logs of no interest.
>     Logger.getRootLogger.setLevel(Level.ERROR)
>     Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
>
>     println()
>     println("Step 3 ReadIgniteWithThinClient of Step1 Data:")
>     println()
>
>     sparkReadIgniteWithThinClient(spark)
>
>     println()
>     println("Step 4 sparkWriteIgniteWithThinClient of Step1 Data:")
>     println()
>
>     sparkWriteIgniteWithThinClient(spark)
>
>     println()
>     println("Step 5 writeJSonToIgniteUsingSpark Using Spark:")
>     println()
>
>     writeJSonToIgniteUsingSpark(spark)
>
>     println()
>     println("Step 6 readIgniteUsingSpark Using Spark:")
>     println()
>
>     readIgniteUsingSpark(spark)
>
>     println()
>     println("Step 7 readThinClientTableUsingSpark Using Spark:")
>     println()
>
>     readThinClientTableUsingSpark(spark)
>
>
>     spark.stop()
>
>
>   }
>
> }
>
>
> example-default.xml
>
> <?xml version="1.0" encoding="UTF-8"?>
>
> <!--
>   Licensed to the Apache Software Foundation (ASF) under one or more
>   contributor license agreements.  See the NOTICE file distributed with
>   this work for additional information regarding copyright ownership.
>   The ASF licenses this file to You under the Apache License, Version 2.0
>   (the "License"); you may not use this file except in compliance with
>   the License.  You may obtain a copy of the License at
>
>        http://www.apache.org/licenses/LICENSE-2.0
>
>   Unless required by applicable law or agreed to in writing, software
>   distributed under the License is distributed on an "AS IS" BASIS,
>   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>   See the License for the specific language governing permissions and
>   limitations under the License.
> -->
>
> <!--
>     Ignite configuration with all defaults and enabled p2p deployment and 
> enabled events.
> -->
> <beans xmlns="http://www.springframework.org/schema/beans";
>        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>        xmlns:util="http://www.springframework.org/schema/util";
>        xsi:schemaLocation="
>         http://www.springframework.org/schema/beans
>         http://www.springframework.org/schema/beans/spring-beans.xsd
>         http://www.springframework.org/schema/util
>         http://www.springframework.org/schema/util/spring-util.xsd";>
>     <bean abstract="true" id="ignite.cfg" 
> class="org.apache.ignite.configuration.IgniteConfiguration">
>         <!-- Set to true to enable distributed class loading for examples, 
> default is false. -->
>         <property name="peerClassLoadingEnabled" value="true"/>
>
>         <!-- Enable task execution events for examples. -->
>         <property name="includeEventTypes">
>             <list>
>                 <!--Task execution events-->
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
>
>                 <!--Cache events-->
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
>             </list>
>         </property>
>
>         <!-- Explicitly configure TCP discovery SPI to provide list of 
> initial nodes. -->
>         <property name="discoverySpi">
>             <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
>                 <property name="ipFinder">
>                     <!--
>                         Ignite provides several options for automatic 
> discovery that can be used
>                         instead os static IP based discovery. For information 
> on all options refer
>                         to our documentation: 
> http://apacheignite.readme.io/docs/cluster-config
>                     -->
>                     <!-- Uncomment static IP finder to enable static-based 
> discovery of initial nodes. -->
>                     <!--<bean 
> class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
>                     <bean 
> class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
>                         <property name="addresses">
>                             <list>
>                                 <!-- In distributed environment, replace with 
> actual host IP address. -->
>                                 <value>18.206.247.40:10800</value>
>                             </list>
>                         </property>
>                     </bean>
>                 </property>
>             </bean>
>         </property>
>     </bean>
> </beans>
>
>
> Thanks
> Sri
>
> On Fri, Oct 18, 2019 at 9:23 AM sri hari kali charan Tummala <
> [email protected]> wrote:
>
> do you mean communication ports 47100-47200 as mentioned (
> https://www.gridgain.com/docs/8.7.6//installation-
> guide/manual-install-on-ec2) ? bare in mind I am running my spark job
> outside ignite ec2 box (MAC PC).
>
> which option is right in my default.xml file?
>
> Option 1:-
>
>                         <property name="addresses">
>                             <list>
>                                 <!-- In distributed environment, replace with 
> actual host IP address. -->
>                                 <value>3.88.248.113:4 
> <http://3.88.248.113:10800/>7100..47200</value>
>                             </list>
>                         </property>
>
>
> Option 2:-
>
>                         <property name="addresses">
>                             <list>
>                                 <!-- In distributed environment, replace with 
> actual host IP address. -->
>                                 <value>3.88.248.113:4 
> <http://3.88.248.113:10800/>7500..47600</value>
>                             </list>
>                         </property>
>
>
> Option 3:-
>
>                         <property name="addresses">
>                             <list>
>                                 <!-- In distributed environment, replace with 
> actual host IP address. -->
>                                 <value>3.88.248.113 
> <http://3.88.248.113:10800/></value>
>                             </list>
>                         </property>
>
>
>
> Thanks
>
> On Fri, Oct 18, 2019 at 9:18 AM sri hari kali charan Tummala <
> [email protected]> wrote:
>
> Hi Stephen ,
>
>  do you mean 3.88.248.113: <http://3.88.248.113:10800/>47500..47700
> something like this? or just public ip 3.88.248.113
> <http://3.88.248.113:10800/> I tried all the possibilities none of them
> are getting connected.
>
> Thanks
> Sri
>
> On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington <
> [email protected]> wrote:
>
> You’re trying to connect a thick client (the Spark integration) to the
> thin client port (10800). Your example-default.xml file needs to have the
> same configuration as your server node(s).
>
> Regards,
> Stephen
>
> On 17 Oct 2019, at 18:12, sri hari kali charan Tummala <
> [email protected]> wrote:
>
> Hi Community,
>
> I am trying to read and write into the Ignite cluster using apache-spark I
> am able to do that using JDBC thin client but not native method as
> mentioned in several spark + ignite examples.
>
> Right now all the spark + ignite examples launch a local ignite cluster
> but I want my code connecting to already existing cluster (client).
>
> Question:-
> *How to pass Ignite connection ip and port (10800)  10800 in
> example-default.xml ?*
>
> Error:-
> *TcpDiscoverySpi: Failed to connect to any address from IP finder (will
> retry to join topology every 2000 ms; change 'reconnectDelay' to configure
> the frequency of retries): [/3.88.248.113:10800
> <http://3.88.248.113:10800/>]*
>
> Working (Spark + Ignite using JDBC):-
>
> val df = spark.read
> .format("jdbc")
> .option("url", "jdbc:ignite:thin://3.88.248.113")
> .option("fetchsize",100)
> //.option("driver", "org.apache.ignite.IgniteJdbcDriver")
> .option("dbtable", "Person").load()
>
> df.printSchema()
>
> df.createOrReplaceTempView("test")
>
> spark.sql("select * from test where id=1").show(10)
>
> spark.sql("select 4,'blah',124232").show(10)
>
> import java.sql.DriverManager
> val connection = 
> DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
>
> import java.util.Properties
> val connectionProperties = new Properties()
>
> connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
>
> spark.sql("select 4 as ID,'blah' as STREET,124232 as 
> ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
>   "Person",connectionProperties)
>
> spark.read
>   .format("jdbc")
>   .option("url", "jdbc:ignite:thin://3.88.248.113")
>   .option("fetchsize",100)
>   .option("dbtable", "Person").load().show(10,false)
>
>
> Not Working requires a CONFIG file which is example-default.xml:-
>
> val igniteDF = spark.read
>   .format(FORMAT_IGNITE) //Data source type.
>   .option(OPTION_TABLE, "person") //Table to read.
>   .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
>   .load()
>   .filter(col("id") >= 2) //Filter clause.
>   .filter(col("name") like "%J%") //Another filter clause.
>
>
> Full Code:- (sparkDSLExample) function fails to connect ignite cluster
> which I already have
>
> package com.ignite.examples.spark
>
> import com.ignite.examples.model.Address
> import org.apache.ignite.{Ignite, Ignition}
> import org.apache.ignite.cache.query.SqlFieldsQuery
> import org.apache.ignite.client.{ClientCache, IgniteClient}
> import org.apache.ignite.configuration.{CacheConfiguration, 
> ClientConfiguration}
> import java.lang.{Long => JLong, String => JString}
>
> import org.apache.ignite.cache.query.SqlFieldsQuery
> import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, 
> OPTION_CONFIG_FILE, OPTION_TABLE}
> import org.apache.log4j.{Level, Logger}
> import org.apache.spark.sql.{SaveMode, SparkSession}
> import org.apache.spark.sql.functions.col
>
> object SparkClientConnectionTest {
>
>   private val CACHE_NAME = "SparkCache"
>
>   private val CONFIG = 
> "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
>
>   def setupExampleData = {
>
>     val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800")
>     val igniteClient:IgniteClient = Ignition.startClient(cfg2)
>
>     System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
>
>     val cache:ClientCache[Integer, Address] = 
> igniteClient.getOrCreateCache(CACHE_NAME)
>
>     cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS 
> Person"))
>       .setSchema("PUBLIC")).getAll
>
>     cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS 
> Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH 
> \"VALUE_TYPE=%s\"", classOf[Address].getName))
>       .setSchema("PUBLIC")).getAll
>
>     cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) 
> VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", 
> "04074").setSchema("PUBLIC")).getAll
>     cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) 
> VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", 
> "520003").setSchema("PUBLIC")).getAll
>     cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) 
> VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", 
> "1234").setSchema("PUBLIC")).getAll
>
>     System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
>
>     val data=cache.query(new SqlFieldsQuery("select * from 
> Person").setSchema("PUBLIC")).getAll
>
>     println(data.toString)
>   }
>
>   def sparkDSLExample(implicit spark: SparkSession): Unit = {
>     println("Querying using Spark DSL.")
>     println
>
>
>     val igniteDF = spark.read
>       .format(FORMAT_IGNITE) //Data source type.
>       .option(OPTION_TABLE, "person") //Table to read.
>       .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
>       .load()
>       .filter(col("id") >= 2) //Filter clause.
>       .filter(col("name") like "%J%") //Another filter clause.
>
>     println("Data frame schema:")
>
>     igniteDF.printSchema() //Printing query schema to console.
>
>     println("Data frame content:")
>
>     igniteDF.show() //Printing query results to console.
>   }
>
>
>   def main(args: Array[String]): Unit = {
>
>     setupExampleData
>
>     //Creating spark session.
>     implicit val spark = SparkSession.builder()
>       .appName("Spark Ignite data sources example")
>       .master("local")
>       .config("spark.executor.instances", "2")
>       .getOrCreate()
>
>     // Adjust the logger to exclude the logs of no interest.
>     Logger.getRootLogger.setLevel(Level.ERROR)
>     Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
>
>     //sparkDSLExample
>
>
>     val df = spark.read
>     .format("jdbc")
>     .option("url", "jdbc:ignite:thin://3.88.248.113")
>     .option("fetchsize",100)
>     //.option("driver", "org.apache.ignite.IgniteJdbcDriver")
>     .option("dbtable", "Person").load()
>
>     df.printSchema()
>
>     df.createOrReplaceTempView("test")
>
>     spark.sql("select * from test where id=1").show(10)
>
>     spark.sql("select 4,'blah',124232").show(10)
>
>     import java.sql.DriverManager
>     val connection = 
> DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
>
>     import java.util.Properties
>     val connectionProperties = new Properties()
>
>     connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
>
>     spark.sql("select 4 as ID,'blah' as STREET,124232 as 
> ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
>       "Person",connectionProperties)
>
>     spark.read
>       .format("jdbc")
>       .option("url", "jdbc:ignite:thin://3.88.248.113")
>       .option("fetchsize",100)
>       .option("dbtable", "Person").load().show(10,false)
>
>   }
>
> }
>
>
> example-default.xml:-
>
> <?xml version="1.0" encoding="UTF-8"?>
>
> <!--
>   Licensed to the Apache Software Foundation (ASF) under one or more
>   contributor license agreements.  See the NOTICE file distributed with
>   this work for additional information regarding copyright ownership.
>   The ASF licenses this file to You under the Apache License, Version 2.0
>   (the "License"); you may not use this file except in compliance with
>   the License.  You may obtain a copy of the License at
>
>        http://www.apache.org/licenses/LICENSE-2.0
>
>   Unless required by applicable law or agreed to in writing, software
>   distributed under the License is distributed on an "AS IS" BASIS,
>   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>   See the License for the specific language governing permissions and
>   limitations under the License.
> -->
>
> <!--
>     Ignite configuration with all defaults and enabled p2p deployment and 
> enabled events.
> -->
> <beans xmlns="http://www.springframework.org/schema/beans";
>        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>        xmlns:util="http://www.springframework.org/schema/util";
>        xsi:schemaLocation="
>         http://www.springframework.org/schema/beans
>         http://www.springframework.org/schema/beans/spring-beans.xsd
>         http://www.springframework.org/schema/util
>         http://www.springframework.org/schema/util/spring-util.xsd";>
>     <bean abstract="true" id="ignite.cfg" 
> class="org.apache.ignite.configuration.IgniteConfiguration">
>         <!-- Set to true to enable distributed class loading for examples, 
> default is false. -->
>         <property name="peerClassLoadingEnabled" value="true"/>
>
>         <!-- Enable task execution events for examples. -->
>         <property name="includeEventTypes">
>             <list>
>                 <!--Task execution events-->
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
>
>                 <!--Cache events-->
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
>             </list>
>         </property>
>
>         <!-- Explicitly configure TCP discovery SPI to provide list of 
> initial nodes. -->
>         <property name="discoverySpi">
>             <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
>                 <property name="ipFinder">
>                     <!--
>                         Ignite provides several options for automatic 
> discovery that can be used
>                         instead os static IP based discovery. For information 
> on all options refer
>                         to our documentation: 
> http://apacheignite.readme.io/docs/cluster-config
>                     -->
>                     <!-- Uncomment static IP finder to enable static-based 
> discovery of initial nodes. -->
>                     <!--<bean 
> class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
>                     <bean 
> class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
>                         <property name="addresses">
>                             <list>
>                                 <!-- In distributed environment, replace with 
> actual host IP address. -->
>                                 <value>3.88.248.113:10800</value>
>
>

-- 
Thanks & Regards
Sri Tummala

Reply via email to