do you mean communication ports 47100-47200 as mentioned (
https://www.gridgain.com/docs/8.7.6//installation-guide/manual-install-on-ec2)
? bare in mind I am running my spark job outside ignite ec2 box (MAC PC).

which option is right in my default.xml file?

Option 1:-

                        <property name="addresses">
                            <list>
                                <!-- In distributed environment,
replace with actual host IP address. -->
                                <value>3.88.248.113:4
<http://3.88.248.113:10800/>7100..47200</value>
                            </list>
                        </property>


Option 2:-

                        <property name="addresses">
                            <list>
                                <!-- In distributed environment,
replace with actual host IP address. -->
                                <value>3.88.248.113:4
<http://3.88.248.113:10800/>7500..47600</value>
                            </list>
                        </property>


Option 3:-

                        <property name="addresses">
                            <list>
                                <!-- In distributed environment,
replace with actual host IP address. -->
                                <value>3.88.248.113
<http://3.88.248.113:10800/></value>
                            </list>
                        </property>



Thanks

On Fri, Oct 18, 2019 at 9:18 AM sri hari kali charan Tummala <
[email protected]> wrote:

> Hi Stephen ,
>
>  do you mean 3.88.248.113: <http://3.88.248.113:10800/>47500..47700
> something like this? or just public ip 3.88.248.113
> <http://3.88.248.113:10800/> I tried all the possibilities none of them
> are getting connected.
>
> Thanks
> Sri
>
> On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington <
> [email protected]> wrote:
>
>> You’re trying to connect a thick client (the Spark integration) to the
>> thin client port (10800). Your example-default.xml file needs to have the
>> same configuration as your server node(s).
>>
>> Regards,
>> Stephen
>>
>> On 17 Oct 2019, at 18:12, sri hari kali charan Tummala <
>> [email protected]> wrote:
>>
>> Hi Community,
>>
>> I am trying to read and write into the Ignite cluster using apache-spark
>> I am able to do that using JDBC thin client but not native method as
>> mentioned in several spark + ignite examples.
>>
>> Right now all the spark + ignite examples launch a local ignite cluster
>> but I want my code connecting to already existing cluster (client).
>>
>> Question:-
>> *How to pass Ignite connection ip and port (10800)  10800 in
>> example-default.xml ?*
>>
>> Error:-
>> *TcpDiscoverySpi: Failed to connect to any address from IP finder (will
>> retry to join topology every 2000 ms; change 'reconnectDelay' to configure
>> the frequency of retries): [/3.88.248.113:10800
>> <http://3.88.248.113:10800/>]*
>>
>> Working (Spark + Ignite using JDBC):-
>>
>> val df = spark.read
>> .format("jdbc")
>> .option("url", "jdbc:ignite:thin://3.88.248.113")
>> .option("fetchsize",100)
>> //.option("driver", "org.apache.ignite.IgniteJdbcDriver")
>> .option("dbtable", "Person").load()
>>
>> df.printSchema()
>>
>> df.createOrReplaceTempView("test")
>>
>> spark.sql("select * from test where id=1").show(10)
>>
>> spark.sql("select 4,'blah',124232").show(10)
>>
>> import java.sql.DriverManager
>> val connection = 
>> DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
>>
>> import java.util.Properties
>> val connectionProperties = new Properties()
>>
>> connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
>>
>> spark.sql("select 4 as ID,'blah' as STREET,124232 as 
>> ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
>>   "Person",connectionProperties)
>>
>> spark.read
>>   .format("jdbc")
>>   .option("url", "jdbc:ignite:thin://3.88.248.113")
>>   .option("fetchsize",100)
>>   .option("dbtable", "Person").load().show(10,false)
>>
>>
>> Not Working requires a CONFIG file which is example-default.xml:-
>>
>> val igniteDF = spark.read
>>   .format(FORMAT_IGNITE) //Data source type.
>>   .option(OPTION_TABLE, "person") //Table to read.
>>   .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
>>   .load()
>>   .filter(col("id") >= 2) //Filter clause.
>>   .filter(col("name") like "%J%") //Another filter clause.
>>
>>
>> Full Code:- (sparkDSLExample) function fails to connect ignite cluster
>> which I already have
>>
>> package com.ignite.examples.spark
>>
>> import com.ignite.examples.model.Address
>> import org.apache.ignite.{Ignite, Ignition}
>> import org.apache.ignite.cache.query.SqlFieldsQuery
>> import org.apache.ignite.client.{ClientCache, IgniteClient}
>> import org.apache.ignite.configuration.{CacheConfiguration, 
>> ClientConfiguration}
>> import java.lang.{Long => JLong, String => JString}
>>
>> import org.apache.ignite.cache.query.SqlFieldsQuery
>> import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, 
>> OPTION_CONFIG_FILE, OPTION_TABLE}
>> import org.apache.log4j.{Level, Logger}
>> import org.apache.spark.sql.{SaveMode, SparkSession}
>> import org.apache.spark.sql.functions.col
>>
>> object SparkClientConnectionTest {
>>
>>   private val CACHE_NAME = "SparkCache"
>>
>>   private val CONFIG = 
>> "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
>>
>>   def setupExampleData = {
>>
>>     val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800")
>>     val igniteClient:IgniteClient = Ignition.startClient(cfg2)
>>
>>     System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
>>
>>     val cache:ClientCache[Integer, Address] = 
>> igniteClient.getOrCreateCache(CACHE_NAME)
>>
>>     cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS 
>> Person"))
>>       .setSchema("PUBLIC")).getAll
>>
>>     cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS 
>> Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH 
>> \"VALUE_TYPE=%s\"", classOf[Address].getName))
>>       .setSchema("PUBLIC")).getAll
>>
>>     cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) 
>> VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", 
>> "04074").setSchema("PUBLIC")).getAll
>>     cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) 
>> VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", 
>> "520003").setSchema("PUBLIC")).getAll
>>     cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) 
>> VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", 
>> "1234").setSchema("PUBLIC")).getAll
>>
>>     System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
>>
>>     val data=cache.query(new SqlFieldsQuery("select * from 
>> Person").setSchema("PUBLIC")).getAll
>>
>>     println(data.toString)
>>   }
>>
>>   def sparkDSLExample(implicit spark: SparkSession): Unit = {
>>     println("Querying using Spark DSL.")
>>     println
>>
>>
>>     val igniteDF = spark.read
>>       .format(FORMAT_IGNITE) //Data source type.
>>       .option(OPTION_TABLE, "person") //Table to read.
>>       .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
>>       .load()
>>       .filter(col("id") >= 2) //Filter clause.
>>       .filter(col("name") like "%J%") //Another filter clause.
>>
>>     println("Data frame schema:")
>>
>>     igniteDF.printSchema() //Printing query schema to console.
>>
>>     println("Data frame content:")
>>
>>     igniteDF.show() //Printing query results to console.
>>   }
>>
>>
>>   def main(args: Array[String]): Unit = {
>>
>>     setupExampleData
>>
>>     //Creating spark session.
>>     implicit val spark = SparkSession.builder()
>>       .appName("Spark Ignite data sources example")
>>       .master("local")
>>       .config("spark.executor.instances", "2")
>>       .getOrCreate()
>>
>>     // Adjust the logger to exclude the logs of no interest.
>>     Logger.getRootLogger.setLevel(Level.ERROR)
>>     Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
>>
>>     //sparkDSLExample
>>
>>
>>     val df = spark.read
>>     .format("jdbc")
>>     .option("url", "jdbc:ignite:thin://3.88.248.113")
>>     .option("fetchsize",100)
>>     //.option("driver", "org.apache.ignite.IgniteJdbcDriver")
>>     .option("dbtable", "Person").load()
>>
>>     df.printSchema()
>>
>>     df.createOrReplaceTempView("test")
>>
>>     spark.sql("select * from test where id=1").show(10)
>>
>>     spark.sql("select 4,'blah',124232").show(10)
>>
>>     import java.sql.DriverManager
>>     val connection = 
>> DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
>>
>>     import java.util.Properties
>>     val connectionProperties = new Properties()
>>
>>     connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
>>
>>     spark.sql("select 4 as ID,'blah' as STREET,124232 as 
>> ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
>>       "Person",connectionProperties)
>>
>>     spark.read
>>       .format("jdbc")
>>       .option("url", "jdbc:ignite:thin://3.88.248.113")
>>       .option("fetchsize",100)
>>       .option("dbtable", "Person").load().show(10,false)
>>
>>   }
>>
>> }
>>
>>
>> example-default.xml:-
>>
>> <?xml version="1.0" encoding="UTF-8"?>
>>
>> <!--
>>   Licensed to the Apache Software Foundation (ASF) under one or more
>>   contributor license agreements.  See the NOTICE file distributed with
>>   this work for additional information regarding copyright ownership.
>>   The ASF licenses this file to You under the Apache License, Version 2.0
>>   (the "License"); you may not use this file except in compliance with
>>   the License.  You may obtain a copy of the License at
>>
>>        http://www.apache.org/licenses/LICENSE-2.0
>>
>>   Unless required by applicable law or agreed to in writing, software
>>   distributed under the License is distributed on an "AS IS" BASIS,
>>   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>   See the License for the specific language governing permissions and
>>   limitations under the License.
>> -->
>>
>> <!--
>>     Ignite configuration with all defaults and enabled p2p deployment and 
>> enabled events.
>> -->
>> <beans xmlns="http://www.springframework.org/schema/beans";
>>        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>        xmlns:util="http://www.springframework.org/schema/util";
>>        xsi:schemaLocation="
>>         http://www.springframework.org/schema/beans
>>         http://www.springframework.org/schema/beans/spring-beans.xsd
>>         http://www.springframework.org/schema/util
>>         http://www.springframework.org/schema/util/spring-util.xsd";>
>>     <bean abstract="true" id="ignite.cfg" 
>> class="org.apache.ignite.configuration.IgniteConfiguration">
>>         <!-- Set to true to enable distributed class loading for examples, 
>> default is false. -->
>>         <property name="peerClassLoadingEnabled" value="true"/>
>>
>>         <!-- Enable task execution events for examples. -->
>>         <property name="includeEventTypes">
>>             <list>
>>                 <!--Task execution events-->
>>                 <util:constant 
>> static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
>>                 <util:constant 
>> static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
>>                 <util:constant 
>> static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
>>                 <util:constant 
>> static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
>>                 <util:constant 
>> static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
>>                 <util:constant 
>> static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
>>
>>                 <!--Cache events-->
>>                 <util:constant 
>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
>>                 <util:constant 
>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
>>                 <util:constant 
>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
>>             </list>
>>         </property>
>>
>>         <!-- Explicitly configure TCP discovery SPI to provide list of 
>> initial nodes. -->
>>         <property name="discoverySpi">
>>             <bean 
>> class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
>>                 <property name="ipFinder">
>>                     <!--
>>                         Ignite provides several options for automatic 
>> discovery that can be used
>>                         instead os static IP based discovery. For 
>> information on all options refer
>>                         to our documentation: 
>> http://apacheignite.readme.io/docs/cluster-config
>>                     -->
>>                     <!-- Uncomment static IP finder to enable static-based 
>> discovery of initial nodes. -->
>>                     <!--<bean 
>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
>>                     <bean 
>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
>>                         <property name="addresses">
>>                             <list>
>>                                 <!-- In distributed environment, replace 
>> with actual host IP address. -->
>>                                 <value>3.88.248.113:10800</value>
>>                             </list>
>>                         </property>
>>                     </bean>
>>                 </property>
>>             </bean>
>>         </property>
>>     </bean>
>> </beans>
>>
>>
>>
>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>
>>
>>
>
> --
> Thanks & Regards
> Sri Tummala
>
>

-- 
Thanks & Regards
Sri Tummala

Reply via email to