Hi Stephen , do you mean 3.88.248.113: <http://3.88.248.113:10800/>47500..47700 something like this? or just public ip 3.88.248.113 <http://3.88.248.113:10800/> I tried all the possibilities none of them are getting connected.
Thanks Sri On Fri, Oct 18, 2019 at 6:02 AM Stephen Darlington < [email protected]> wrote: > You’re trying to connect a thick client (the Spark integration) to the > thin client port (10800). Your example-default.xml file needs to have the > same configuration as your server node(s). > > Regards, > Stephen > > On 17 Oct 2019, at 18:12, sri hari kali charan Tummala < > [email protected]> wrote: > > Hi Community, > > I am trying to read and write into the Ignite cluster using apache-spark I > am able to do that using JDBC thin client but not native method as > mentioned in several spark + ignite examples. > > Right now all the spark + ignite examples launch a local ignite cluster > but I want my code connecting to already existing cluster (client). > > Question:- > *How to pass Ignite connection ip and port (10800) 10800 in > example-default.xml ?* > > Error:- > *TcpDiscoverySpi: Failed to connect to any address from IP finder (will > retry to join topology every 2000 ms; change 'reconnectDelay' to configure > the frequency of retries): [/3.88.248.113:10800 > <http://3.88.248.113:10800/>]* > > Working (Spark + Ignite using JDBC):- > > val df = spark.read > .format("jdbc") > .option("url", "jdbc:ignite:thin://3.88.248.113") > .option("fetchsize",100) > //.option("driver", "org.apache.ignite.IgniteJdbcDriver") > .option("dbtable", "Person").load() > > df.printSchema() > > df.createOrReplaceTempView("test") > > spark.sql("select * from test where id=1").show(10) > > spark.sql("select 4,'blah',124232").show(10) > > import java.sql.DriverManager > val connection = > DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113") > > import java.util.Properties > val connectionProperties = new Properties() > > connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113") > > spark.sql("select 4 as ID,'blah' as STREET,124232 as > ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", > "Person",connectionProperties) > > spark.read > .format("jdbc") > .option("url", "jdbc:ignite:thin://3.88.248.113") > .option("fetchsize",100) > .option("dbtable", "Person").load().show(10,false) > > > Not Working requires a CONFIG file which is example-default.xml:- > > val igniteDF = spark.read > .format(FORMAT_IGNITE) //Data source type. > .option(OPTION_TABLE, "person") //Table to read. > .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. > .load() > .filter(col("id") >= 2) //Filter clause. > .filter(col("name") like "%J%") //Another filter clause. > > > Full Code:- (sparkDSLExample) function fails to connect ignite cluster > which I already have > > package com.ignite.examples.spark > > import com.ignite.examples.model.Address > import org.apache.ignite.{Ignite, Ignition} > import org.apache.ignite.cache.query.SqlFieldsQuery > import org.apache.ignite.client.{ClientCache, IgniteClient} > import org.apache.ignite.configuration.{CacheConfiguration, > ClientConfiguration} > import java.lang.{Long => JLong, String => JString} > > import org.apache.ignite.cache.query.SqlFieldsQuery > import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, > OPTION_CONFIG_FILE, OPTION_TABLE} > import org.apache.log4j.{Level, Logger} > import org.apache.spark.sql.{SaveMode, SparkSession} > import org.apache.spark.sql.functions.col > > object SparkClientConnectionTest { > > private val CACHE_NAME = "SparkCache" > > private val CONFIG = > "/Users/kalit_000/Downloads/designing-event-driven-applications-apache-kafka-ecosystem/05/demos/kafka-streams-after/ApacheIgnitePoc/src/main/scala/com/ignite/examples/config/example-ignite.xml" > > def setupExampleData = { > > val cfg2 = new ClientConfiguration().setAddresses("3.88.248.113:10800") > val igniteClient:IgniteClient = Ignition.startClient(cfg2) > > System.out.format(">>> Created cache [%s].\n", CACHE_NAME) > > val cache:ClientCache[Integer, Address] = > igniteClient.getOrCreateCache(CACHE_NAME) > > cache.query(new SqlFieldsQuery(String.format("DROP TABLE IF EXISTS > Person")) > .setSchema("PUBLIC")).getAll > > cache.query(new SqlFieldsQuery(String.format("CREATE TABLE IF NOT EXISTS > Person (id LONG,street varchar, zip VARCHAR, PRIMARY KEY (id) ) WITH > \"VALUE_TYPE=%s\"", classOf[Address].getName)) > .setSchema("PUBLIC")).getAll > > cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) > VALUES(?,?, ?)").setArgs(1L.asInstanceOf[JLong],"Jameco", > "04074").setSchema("PUBLIC")).getAll > cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) > VALUES(?,?, ?)").setArgs(2L.asInstanceOf[JLong],"Bremar road", > "520003").setSchema("PUBLIC")).getAll > cache.query(new SqlFieldsQuery("INSERT INTO Person(id,street, zip) > VALUES(?,?, ?)").setArgs(3L.asInstanceOf[JLong],"orange road", > "1234").setSchema("PUBLIC")).getAll > > System.out.format(">>> Data Inserted into Cache [%s].\n", CACHE_NAME) > > val data=cache.query(new SqlFieldsQuery("select * from > Person").setSchema("PUBLIC")).getAll > > println(data.toString) > } > > def sparkDSLExample(implicit spark: SparkSession): Unit = { > println("Querying using Spark DSL.") > println > > > val igniteDF = spark.read > .format(FORMAT_IGNITE) //Data source type. > .option(OPTION_TABLE, "person") //Table to read. > .option(OPTION_CONFIG_FILE, CONFIG) //Ignite config. > .load() > .filter(col("id") >= 2) //Filter clause. > .filter(col("name") like "%J%") //Another filter clause. > > println("Data frame schema:") > > igniteDF.printSchema() //Printing query schema to console. > > println("Data frame content:") > > igniteDF.show() //Printing query results to console. > } > > > def main(args: Array[String]): Unit = { > > setupExampleData > > //Creating spark session. > implicit val spark = SparkSession.builder() > .appName("Spark Ignite data sources example") > .master("local") > .config("spark.executor.instances", "2") > .getOrCreate() > > // Adjust the logger to exclude the logs of no interest. > Logger.getRootLogger.setLevel(Level.ERROR) > Logger.getLogger("org.apache.ignite").setLevel(Level.INFO) > > //sparkDSLExample > > > val df = spark.read > .format("jdbc") > .option("url", "jdbc:ignite:thin://3.88.248.113") > .option("fetchsize",100) > //.option("driver", "org.apache.ignite.IgniteJdbcDriver") > .option("dbtable", "Person").load() > > df.printSchema() > > df.createOrReplaceTempView("test") > > spark.sql("select * from test where id=1").show(10) > > spark.sql("select 4,'blah',124232").show(10) > > import java.sql.DriverManager > val connection = > DriverManager.getConnection("jdbc:ignite:thin://3.88.248.113") > > import java.util.Properties > val connectionProperties = new Properties() > > connectionProperties.put("url", "jdbc:ignite:thin://3.88.248.113") > > spark.sql("select 4 as ID,'blah' as STREET,124232 as > ZIP").write.mode(SaveMode.Append).jdbc("jdbc:ignite:thin://3.88.248.113", > "Person",connectionProperties) > > spark.read > .format("jdbc") > .option("url", "jdbc:ignite:thin://3.88.248.113") > .option("fetchsize",100) > .option("dbtable", "Person").load().show(10,false) > > } > > } > > > example-default.xml:- > > <?xml version="1.0" encoding="UTF-8"?> > > <!-- > Licensed to the Apache Software Foundation (ASF) under one or more > contributor license agreements. See the NOTICE file distributed with > this work for additional information regarding copyright ownership. > The ASF licenses this file to You under the Apache License, Version 2.0 > (the "License"); you may not use this file except in compliance with > the License. You may obtain a copy of the License at > > http://www.apache.org/licenses/LICENSE-2.0 > > Unless required by applicable law or agreed to in writing, software > distributed under the License is distributed on an "AS IS" BASIS, > WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > See the License for the specific language governing permissions and > limitations under the License. > --> > > <!-- > Ignite configuration with all defaults and enabled p2p deployment and > enabled events. > --> > <beans xmlns="http://www.springframework.org/schema/beans" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xmlns:util="http://www.springframework.org/schema/util" > xsi:schemaLocation=" > http://www.springframework.org/schema/beans > http://www.springframework.org/schema/beans/spring-beans.xsd > http://www.springframework.org/schema/util > http://www.springframework.org/schema/util/spring-util.xsd"> > <bean abstract="true" id="ignite.cfg" > class="org.apache.ignite.configuration.IgniteConfiguration"> > <!-- Set to true to enable distributed class loading for examples, > default is false. --> > <property name="peerClassLoadingEnabled" value="true"/> > > <!-- Enable task execution events for examples. --> > <property name="includeEventTypes"> > <list> > <!--Task execution events--> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/> > > <!--Cache events--> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> > <util:constant > static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> > </list> > </property> > > <!-- Explicitly configure TCP discovery SPI to provide list of > initial nodes. --> > <property name="discoverySpi"> > <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> > <property name="ipFinder"> > <!-- > Ignite provides several options for automatic > discovery that can be used > instead os static IP based discovery. For information > on all options refer > to our documentation: > http://apacheignite.readme.io/docs/cluster-config > --> > <!-- Uncomment static IP finder to enable static-based > discovery of initial nodes. --> > <!--<bean > class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> > <bean > class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> > <property name="addresses"> > <list> > <!-- In distributed environment, replace with > actual host IP address. --> > <value>3.88.248.113:10800</value> > </list> > </property> > </bean> > </property> > </bean> > </property> > </bean> > </beans> > > > > > -- > Thanks & Regards > Sri Tummala > > > > -- Thanks & Regards Sri Tummala
