do you mean communication ports 47100-47200 as mentioned (
https://www.gridgain.com/docs/8.7.6//installation-guide/manual-install-on-ec2)
? bare in mind I am running my spark job outside ignite ec2 box (MAC PC).
which option is right in my default.xml file?
Option 1:-
<property name="addresses">
<list>
<!-- In distributed environment,
replace with actual host IP address. -->
<value>3.88.248.113:4
<http://3.88.248.113:10800/>7100..47200</value>
</list>
</property>
Option 2:-
<property name="addresses">
<list>
<!-- In distributed environment,
replace with actual host IP address. -->
<value>3.88.248.113:4
<http://3.88.248.113:10800/>7500..47600</value>
</list>
</property>
Option 3:-
<property name="addresses">
<list>
<!-- In distributed environment,
replace with actual host IP address. -->
<value>3.88.248.113
<http://3.88.248.113:10800/></value>
</list>
</property>
Thanks
On Fri, Oct 18, 2019 at 9:18 AM sri hari kali charan Tummala <
[email protected]> wrote:
> Hi Stephen ,
>
> do you mean 3.88.248.113: <http://3.88.248.113:10800/>47500..47700
> something like this? or just public ip 3.88.248.113
> <http://3.88.248.113:10800/> I tried all the possibilities none of them
> are getting connected.
>
> Thanks
> Sri
>
> On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington <
> [email protected]> wrote:
>
>> You’re trying to connect a thick client (the Spark integration) to the
>> thin client port (10800). Your example-default.xml file needs to have the
>> same configuration as your server node(s).
>>
>> Regards,
>> Stephen
>>
>> On 17 Oct 2019, at 18:12, sri hari kali charan Tummala <
>> [email protected]> wrote:
>>
>> Hi Community,
>>
>> I am trying to read and write into the Ignite cluster using apache-spark
>> I am able to do that using JDBC thin client but not native method as
>> mentioned in several spark + ignite examples.
>>
>> Right now all the spark + ignite examples launch a local ignite cluster
>> but I want my code connecting to already existing cluster (client).
>>
>> Question:-
>> *How to pass Ignite connection ip and port (10800) 10800 in
>> example-default.xml ?*
>>
>> Error:-
>> *TcpDiscoverySpi: Failed to connect to any address from IP finder (will
>> retry to join topology every 2000 ms; change 'reconnectDelay' to configure
>> the frequency of retries): [/3.88.248.113:10800
>> <http://3.88.248.113:10800/>]*
>>
>> Working (Spark + Ignite using JDBC):-
>>
>> val df = spark.read
>> .format("jdbc")
>> .option("url", "jdbc:ignite:thin://3.88.248.113")
>> .option("fetchsize",100)
>> //.option("driver", "org.apache.ignite.IgniteJdbcDriver")
>> .option("dbtable", "Person").load()
>>
>> df.printSchema()
>>
>> df.createOrReplaceTempView("test")
>>
>> spark.sql("select * from test where id=1").show(10)
>>
>> spark.sql("select 4,'blah',124232").show(10)
>>
>> import java.sql.DriverManager
>> val connection =
>> DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
>>
>> import java.util.Properties
>> val connectionProperties = new Properties()
>>
>> connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
>>
>> spark.sql("select 4 as ID,'blah' as STREET,124232 as
>> ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
>> "Person",connectionProperties)
>>
>> spark.read
>> .format("jdbc")
>> .option("url", "jdbc:ignite:thin://3.88.248.113")
>> .option("fetchsize",100)
>> .option("dbtable", "Person").load().show(10,false)
>>
>>
>> Not Working requires a CONFIG file which is example-default.xml:-
>>
>> val igniteDF = spark.read
>> .format(FORMAT_IGNITE) //Data source type.
>> .option(OPTION_TABLE, "person") //Table to read.
>> .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
>> .load()
>> .filter(col("id") >= 2) //Filter clause.
>> .filter(col("name") like "%J%") //Another filter clause.
>>
>>
>> Full Code:- (sparkDSLExample) function fails to connect ignite cluster
>> which I already have
>>
>> package com.ignite.examples.spark
>>
>> import com.ignite.examples.model.Address
>> import org.apache.ignite.{Ignite, Ignition}
>> import org.apache.ignite.cache.query.SqlFieldsQuery
>> import org.apache.ignite.client.{ClientCache, IgniteClient}
>> import org.apache.ignite.configuration.{CacheConfiguration,
>> ClientConfiguration}
>> import java.lang.{Long => JLong, String => JString}
>>
>> import org.apache.ignite.cache.query.SqlFieldsQuery
>> import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE,
>> OPTION_CONFIG_FILE, OPTION_TABLE}
>> import org.apache.log4j.{Level, Logger}
>> import org.apache.spark.sql.{SaveMode, SparkSession}
>> import org.apache.spark.sql.functions.col
>>
>> object SparkClientConnectionTest {
>>
>> private val CACHE_NAME = "SparkCache"
>>
>> private val CONFIG =
>> "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml"
>>
>> def setupExampleData = {
>>
>> val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800")
>> val igniteClient:IgniteClient = Ignition.startClient(cfg2)
>>
>> System.out.format(">>> Created cache [%s].\n", CACHE_NAME)
>>
>> val cache:ClientCache[Integer, Address] =
>> igniteClient.getOrCreateCache(CACHE_NAME)
>>
>> cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS
>> Person"))
>> .setSchema("PUBLIC")).getAll
>>
>> cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS
>> Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH
>> \"VALUE_TYPE=%s\"", classOf[Address].getName))
>> .setSchema("PUBLIC")).getAll
>>
>> cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip)
>> VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco",
>> "04074").setSchema("PUBLIC")).getAll
>> cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip)
>> VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road",
>> "520003").setSchema("PUBLIC")).getAll
>> cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip)
>> VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road",
>> "1234").setSchema("PUBLIC")).getAll
>>
>> System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME)
>>
>> val data=cache.query(new SqlFieldsQuery("select * from
>> Person").setSchema("PUBLIC")).getAll
>>
>> println(data.toString)
>> }
>>
>> def sparkDSLExample(implicit spark: SparkSession): Unit = {
>> println("Querying using Spark DSL.")
>> println
>>
>>
>> val igniteDF = spark.read
>> .format(FORMAT_IGNITE) //Data source type.
>> .option(OPTION_TABLE, "person") //Table to read.
>> .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config.
>> .load()
>> .filter(col("id") >= 2) //Filter clause.
>> .filter(col("name") like "%J%") //Another filter clause.
>>
>> println("Data frame schema:")
>>
>> igniteDF.printSchema() //Printing query schema to console.
>>
>> println("Data frame content:")
>>
>> igniteDF.show() //Printing query results to console.
>> }
>>
>>
>> def main(args: Array[String]): Unit = {
>>
>> setupExampleData
>>
>> //Creating spark session.
>> implicit val spark = SparkSession.builder()
>> .appName("Spark Ignite data sources example")
>> .master("local")
>> .config("spark.executor.instances", "2")
>> .getOrCreate()
>>
>> // Adjust the logger to exclude the logs of no interest.
>> Logger.getRootLogger.setLevel(Level.ERROR)
>> Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
>>
>> //sparkDSLExample
>>
>>
>> val df = spark.read
>> .format("jdbc")
>> .option("url", "jdbc:ignite:thin://3.88.248.113")
>> .option("fetchsize",100)
>> //.option("driver", "org.apache.ignite.IgniteJdbcDriver")
>> .option("dbtable", "Person").load()
>>
>> df.printSchema()
>>
>> df.createOrReplaceTempView("test")
>>
>> spark.sql("select * from test where id=1").show(10)
>>
>> spark.sql("select 4,'blah',124232").show(10)
>>
>> import java.sql.DriverManager
>> val connection =
>> DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113")
>>
>> import java.util.Properties
>> val connectionProperties = new Properties()
>>
>> connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113")
>>
>> spark.sql("select 4 as ID,'blah' as STREET,124232 as
>> ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113",
>> "Person",connectionProperties)
>>
>> spark.read
>> .format("jdbc")
>> .option("url", "jdbc:ignite:thin://3.88.248.113")
>> .option("fetchsize",100)
>> .option("dbtable", "Person").load().show(10,false)
>>
>> }
>>
>> }
>>
>>
>> example-default.xml:-
>>
>> <?xml version="1.0" encoding="UTF-8"?>
>>
>> <!--
>> Licensed to the Apache Software Foundation (ASF) under one or more
>> contributor license agreements. See the NOTICE file distributed with
>> this work for additional information regarding copyright ownership.
>> The ASF licenses this file to You under the Apache License, Version 2.0
>> (the "License"); you may not use this file except in compliance with
>> the License. You may obtain a copy of the License at
>>
>> http://www.apache.org/licenses/LICENSE-2.0
>>
>> Unless required by applicable law or agreed to in writing, software
>> distributed under the License is distributed on an "AS IS" BASIS,
>> WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> See the License for the specific language governing permissions and
>> limitations under the License.
>> -->
>>
>> <!--
>> Ignite configuration with all defaults and enabled p2p deployment and
>> enabled events.
>> -->
>> <beans xmlns="http://www.springframework.org/schema/beans"
>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>> xmlns:util="http://www.springframework.org/schema/util"
>> xsi:schemaLocation="
>> http://www.springframework.org/schema/beans
>> http://www.springframework.org/schema/beans/spring-beans.xsd
>> http://www.springframework.org/schema/util
>> http://www.springframework.org/schema/util/spring-util.xsd">
>> <bean abstract="true" id="ignite.cfg"
>> class="org.apache.ignite.configuration.IgniteConfiguration">
>> <!-- Set to true to enable distributed class loading for examples,
>> default is false. -->
>> <property name="peerClassLoadingEnabled" value="true"/>
>>
>> <!-- Enable task execution events for examples. -->
>> <property name="includeEventTypes">
>> <list>
>> <!--Task execution events-->
>> <util:constant
>> static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
>> <util:constant
>> static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
>> <util:constant
>> static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
>> <util:constant
>> static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
>> <util:constant
>> static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
>> <util:constant
>> static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
>>
>> <!--Cache events-->
>> <util:constant
>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
>> <util:constant
>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
>> <util:constant
>> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
>> </list>
>> </property>
>>
>> <!-- Explicitly configure TCP discovery SPI to provide list of
>> initial nodes. -->
>> <property name="discoverySpi">
>> <bean
>> class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
>> <property name="ipFinder">
>> <!--
>> Ignite provides several options for automatic
>> discovery that can be used
>> instead os static IP based discovery. For
>> information on all options refer
>> to our documentation:
>> http://apacheignite.readme.io/docs/cluster-config
>> -->
>> <!-- Uncomment static IP finder to enable static-based
>> discovery of initial nodes. -->
>> <!--<bean
>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
>> <bean
>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
>> <property name="addresses">
>> <list>
>> <!-- In distributed environment, replace
>> with actual host IP address. -->
>> <value>3.88.248.113:10800</value>
>> </list>
>> </property>
>> </bean>
>> </property>
>> </bean>
>> </property>
>> </bean>
>> </beans>
>>
>>
>>
>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>
>>
>>
>
> --
> Thanks & Regards
> Sri Tummala
>
>
--
Thanks & Regards
Sri Tummala