Hi Stephen ,

 do you mean 3.88.248.113: <http://3.88.248.113:10800/>47500..47700
something like this? or just public ip 3.88.248.113
<http://3.88.248.113:10800/> I tried all the possibilities none of them are
getting connected.

Thanks
Sri

On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington <
[email protected]> wrote:

> You’re trying to connect a thick client (the Spark integration) to the
> thin client port (10800). Your example-default.xml file needs to have the
> same configuration as your server node(s).
>
> Regards,
> Stephen
>
> On 17 Oct 2019, at 18:12, sri hari kali charan Tummala <
> [email protected]> wrote:
>
> Hi Community,
>
> I am trying to read and write into the Ignite cluster using apache-spark I
> am able to do that using JDBC thin client but not native method as
> mentioned in several spark + ignite examples.
>
> Right now all the spark + ignite examples launch a local ignite cluster
> but I want my code connecting to already existing cluster (client).
>
> Question:-
> *How to pass Ignite connection ip and port (10800)  10800 in
> example-default.xml ?*
>
> Error:-
> *TcpDiscoverySpi: Failed to connect to any address from IP finder (will
> retry to join topology every 2000 ms; change 'reconnectDelay' to configure
> the frequency of retries): [/3.88.248.113:10800
> <http://3.88.248.113:10800/>]*
>
> Working (Spark + Ignite using JDBC):-
>
> val df = spark.read
> .format("jdbc")
> .option("url", "jdbc:ignite:thin://3.88.248.113")
> .option("fetchsize",100)
> //.option("driver", "org.apache.ignite.IgniteJdbcDriver")
> .option("dbtable", "Person").load()
>
> df.printSchema()
>
> df.createOrReplaceTempView("test")
>
> spark.sql("select * from test where id=1").show(10)
>
> spark.sql("select 4,'blah',124232").show(10)
>
> import java.sql.DriverManager
> val connection = 
> DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
>
> import java.util.Properties
> val connectionProperties = new Properties()
>
> connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
>
> spark.sql("select 4 as ID,'blah' as STREET,124232 as 
> ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
>   "Person",connectionProperties)
>
> spark.read
>   .format("jdbc")
>   .option("url", "jdbc:ignite:thin://3.88.248.113")
>   .option("fetchsize",100)
>   .option("dbtable", "Person").load().show(10,false)
>
>
> Not Working requires a CONFIG file which is example-default.xml:-
>
> val igniteDF = spark.read
>   .format(FORMAT_IGNITE) //Data source type.
>   .option(OPTION_TABLE, "person") //Table to read.
>   .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
>   .load()
>   .filter(col("id") >= 2) //Filter clause.
>   .filter(col("name") like "%J%") //Another filter clause.
>
>
> Full Code:- (sparkDSLExample) function fails to connect ignite cluster
> which I already have
>
> package com.ignite.examples.spark
>
> import com.ignite.examples.model.Address
> import org.apache.ignite.{Ignite, Ignition}
> import org.apache.ignite.cache.query.SqlFieldsQuery
> import org.apache.ignite.client.{ClientCache, IgniteClient}
> import org.apache.ignite.configuration.{CacheConfiguration, 
> ClientConfiguration}
> import java.lang.{Long => JLong, String => JString}
>
> import org.apache.ignite.cache.query.SqlFieldsQuery
> import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, 
> OPTION_CONFIG_FILE, OPTION_TABLE}
> import org.apache.log4j.{Level, Logger}
> import org.apache.spark.sql.{SaveMode, SparkSession}
> import org.apache.spark.sql.functions.col
>
> object SparkClientConnectionTest {
>
>   private val CACHE_NAME = "SparkCache"
>
>   private val CONFIG = 
> "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
>
>   def setupExampleData = {
>
>     val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800")
>     val igniteClient:IgniteClient = Ignition.startClient(cfg2)
>
>     System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
>
>     val cache:ClientCache[Integer, Address] = 
> igniteClient.getOrCreateCache(CACHE_NAME)
>
>     cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS 
> Person"))
>       .setSchema("PUBLIC")).getAll
>
>     cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS 
> Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH 
> \"VALUE_TYPE=%s\"", classOf[Address].getName))
>       .setSchema("PUBLIC")).getAll
>
>     cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) 
> VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", 
> "04074").setSchema("PUBLIC")).getAll
>     cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) 
> VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", 
> "520003").setSchema("PUBLIC")).getAll
>     cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) 
> VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", 
> "1234").setSchema("PUBLIC")).getAll
>
>     System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
>
>     val data=cache.query(new SqlFieldsQuery("select * from 
> Person").setSchema("PUBLIC")).getAll
>
>     println(data.toString)
>   }
>
>   def sparkDSLExample(implicit spark: SparkSession): Unit = {
>     println("Querying using Spark DSL.")
>     println
>
>
>     val igniteDF = spark.read
>       .format(FORMAT_IGNITE) //Data source type.
>       .option(OPTION_TABLE, "person") //Table to read.
>       .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
>       .load()
>       .filter(col("id") >= 2) //Filter clause.
>       .filter(col("name") like "%J%") //Another filter clause.
>
>     println("Data frame schema:")
>
>     igniteDF.printSchema() //Printing query schema to console.
>
>     println("Data frame content:")
>
>     igniteDF.show() //Printing query results to console.
>   }
>
>
>   def main(args: Array[String]): Unit = {
>
>     setupExampleData
>
>     //Creating spark session.
>     implicit val spark = SparkSession.builder()
>       .appName("Spark Ignite data sources example")
>       .master("local")
>       .config("spark.executor.instances", "2")
>       .getOrCreate()
>
>     // Adjust the logger to exclude the logs of no interest.
>     Logger.getRootLogger.setLevel(Level.ERROR)
>     Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
>
>     //sparkDSLExample
>
>
>     val df = spark.read
>     .format("jdbc")
>     .option("url", "jdbc:ignite:thin://3.88.248.113")
>     .option("fetchsize",100)
>     //.option("driver", "org.apache.ignite.IgniteJdbcDriver")
>     .option("dbtable", "Person").load()
>
>     df.printSchema()
>
>     df.createOrReplaceTempView("test")
>
>     spark.sql("select * from test where id=1").show(10)
>
>     spark.sql("select 4,'blah',124232").show(10)
>
>     import java.sql.DriverManager
>     val connection = 
> DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
>
>     import java.util.Properties
>     val connectionProperties = new Properties()
>
>     connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
>
>     spark.sql("select 4 as ID,'blah' as STREET,124232 as 
> ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
>       "Person",connectionProperties)
>
>     spark.read
>       .format("jdbc")
>       .option("url", "jdbc:ignite:thin://3.88.248.113")
>       .option("fetchsize",100)
>       .option("dbtable", "Person").load().show(10,false)
>
>   }
>
> }
>
>
> example-default.xml:-
>
> <?xml version="1.0" encoding="UTF-8"?>
>
> <!--
>   Licensed to the Apache Software Foundation (ASF) under one or more
>   contributor license agreements.  See the NOTICE file distributed with
>   this work for additional information regarding copyright ownership.
>   The ASF licenses this file to You under the Apache License, Version 2.0
>   (the "License"); you may not use this file except in compliance with
>   the License.  You may obtain a copy of the License at
>
>        http://www.apache.org/licenses/LICENSE-2.0
>
>   Unless required by applicable law or agreed to in writing, software
>   distributed under the License is distributed on an "AS IS" BASIS,
>   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>   See the License for the specific language governing permissions and
>   limitations under the License.
> -->
>
> <!--
>     Ignite configuration with all defaults and enabled p2p deployment and 
> enabled events.
> -->
> <beans xmlns="http://www.springframework.org/schema/beans";
>        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>        xmlns:util="http://www.springframework.org/schema/util";
>        xsi:schemaLocation="
>         http://www.springframework.org/schema/beans
>         http://www.springframework.org/schema/beans/spring-beans.xsd
>         http://www.springframework.org/schema/util
>         http://www.springframework.org/schema/util/spring-util.xsd";>
>     <bean abstract="true" id="ignite.cfg" 
> class="org.apache.ignite.configuration.IgniteConfiguration">
>         <!-- Set to true to enable distributed class loading for examples, 
> default is false. -->
>         <property name="peerClassLoadingEnabled" value="true"/>
>
>         <!-- Enable task execution events for examples. -->
>         <property name="includeEventTypes">
>             <list>
>                 <!--Task execution events-->
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
>
>                 <!--Cache events-->
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
>                 <util:constant 
> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
>             </list>
>         </property>
>
>         <!-- Explicitly configure TCP discovery SPI to provide list of 
> initial nodes. -->
>         <property name="discoverySpi">
>             <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
>                 <property name="ipFinder">
>                     <!--
>                         Ignite provides several options for automatic 
> discovery that can be used
>                         instead os static IP based discovery. For information 
> on all options refer
>                         to our documentation: 
> http://apacheignite.readme.io/docs/cluster-config
>                     -->
>                     <!-- Uncomment static IP finder to enable static-based 
> discovery of initial nodes. -->
>                     <!--<bean 
> class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
>                     <bean 
> class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
>                         <property name="addresses">
>                             <list>
>                                 <!-- In distributed environment, replace with 
> actual host IP address. -->
>                                 <value>3.88.248.113:10800</value>
>                             </list>
>                         </property>
>                     </bean>
>                 </property>
>             </bean>
>         </property>
>     </bean>
> </beans>
>
>
>
>
> --
> Thanks & Regards
> Sri Tummala
>
>
>
>

-- 
Thanks & Regards
Sri Tummala

Reply via email to