package com.test;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spark.IgniteContext;
import org.apache.ignite.spark.IgniteDataFrameSettings;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import
org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.ignite.IgniteSparkSession;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.inn.sparkrunner.context.JobContext;
import com.inn.sparkrunner.context.JobContextImpl;
import com.inn.sparkrunner.createsparkcontext.CreateSparkContext;
import com.inn.sparkrunner.util.ConfigUtil;

public class JoinUsingIgnite {

        public static void main(String[] args) throws SecurityException, 
Exception
{
                Logger logger = LoggerFactory.getLogger(JoinUsingIgnite.class);

                ConfigUtil.setConfig();
                System.setProperty("IGNITE_HOME",
"/opt/kylo/testignite/apache-ignite-fabric-2.5.0-bin");
                
                JobContext jobContext = null;
                SparkSession sparkSession = null;
                sparkSession = CreateSparkContext.create(args);
                jobContext = new JobContextImpl(sparkSession);
                TcpDiscoverySpi spi = new TcpDiscoverySpi();
                TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
                ipFinder.setAddresses(java.util.Arrays.asList("sf3.start.com",
"sf2.start.com"));
                spi.setIpFinder(ipFinder);

                IgniteConfiguration cfg = new IgniteConfiguration();
                // cfg.setIgniteInstanceName("grid");
                cfg.setDiscoverySpi(spi);
                
                Ignite ignite = Ignition.start(cfg);
                IgniteContext igniteContext = new
IgniteContext(sparkSession.sparkContext(),
                        
"/opt/kylo/testignite/apache-ignite-fabric-2.5.0-bin/config/default-config.xml");

                IgniteSparkSession igniteSparkSession = new
IgniteSparkSession(igniteContext, sparkSession);

                Properties connectionproperties = new Properties();

                connectionproperties.put("user", "FORESIGHT_PRODUCT");
                connectionproperties.put("password", "FORESIGHT_PRODUCT@#^249");
                connectionproperties.setProperty("Driver", 
"com.mysql.jdbc.Driver");

                logger.info("spark start------");

                Dataset<Row> jdbc = 
igniteSparkSession.read().option("inferschema",
"false").jdbc(
                                "jdbc:mysql://192.168.4.249:3306/FORESIGHT", 
"(SELECT * FROM
NetworkElement) emp",
                                connectionproperties);
                

                Dataset<Row> jdbc1 = 
igniteSparkSession.read().option("inferschema",
"false").jdbc(
                                "jdbc:mysql://192.168.4.249:3306/FORESIGHT", 
"(SELECT * FROM
MacroSiteDetail) emp1",
                                connectionproperties);

                
                CacheConfiguration setSqlSchema = new 
CacheConfiguration("cachecfg")
                                .setSqlSchema("PUBLIC");

                IgniteCache cache = ignite.getOrCreateCache(setSqlSchema);
        
        
                CacheConfiguration setSqlSchema2 = new 
CacheConfiguration("table2")
                                .setSqlSchema("PUBLIC");
                IgniteCache cache2 = ignite.getOrCreateCache(setSqlSchema2);
                        
                creatingTable(jdbc, cache, "ignitetab1", "networkelementid_pk");
                SqlFieldsQuery creatingInsertingCommand = 
creatingInsertingCommand(jdbc,
cache, "ignitetab1");
                InsertingData(jdbc, cache, creatingInsertingCommand);
                
                creatingTable(jdbc1, cache2, "ignitetab2", 
"macrositedetailid_pk");
                SqlFieldsQuery creatingInsertingCommand1 = 
creatingInsertingCommand(jdbc1,
cache2, "ignitetab2");
                InsertingData(jdbc1, cache2, creatingInsertingCommand1);

                Dataset<Row> df1 =
igniteSparkSession.read().format(IgniteDataFrameSettings.FORMAT_IGNITE())
                                .option(IgniteDataFrameSettings.OPTION_TABLE(), 
"ignitetab1")
                                
.option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(),
                                        
"/opt/kylo/testignite/apache-ignite-fabric-2.5.0-bin/config/default-config.xml")

                                .load().repartition(1);
                df1.createOrReplaceTempView("df1");
                Dataset<Row> df2 =
igniteSparkSession.read().format(IgniteDataFrameSettings.FORMAT_IGNITE())
                                .option(IgniteDataFrameSettings.OPTION_TABLE(), 
"ignitetab2")
                                
.option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(),
                                        
"/opt/kylo/testignite/apache-ignite-fabric-2.5.0-bin/config/default-config.xml")

                                .load().repartition(1);
                
                
                
                df2.createOrReplaceTempView("df2");
                
                logger.info("join query");
                Dataset<Row> ignite1 = igniteSparkSession.sql(
                                "select * from df1 join df2 on
df1.networkelementid_pk=df2.macrositedetailid_pk");

                
logger.info("join query end");
                logger.info("ignite dataframe count------[{}]",ignite1.count());

                igniteSparkSession.close();
                Ignition.stop(true);
        }

        
        private static void creatingTable(Dataset<Row> employee, IgniteCache 
cache,
String tempTable, String index) {

                String query = gettingCreateQuery(employee);

                String str = "CREATE TABLE " + tempTable + " (" + query + ") 
WITH
\"template=partitioned\"";

                System.out.println("create query--->" + str);

                cache.query(new 
SqlFieldsQuery(str).setSchema("PUBLIC")).getAll();

                cache.query(new SqlFieldsQuery("CREATE INDEX on " + tempTable + 
" (" +
index + ")")).getAll();

        }

        private static String gettingCreateQuery(Dataset<Row> employee) {

                String str = "";
                StructField[] fields = employee.schema().fields();
                for (int i = 0; i < fields.length; i++) {
                        String datatype =
fields[i].dataType().typeName().equalsIgnoreCase("string") ? "VARCHAR"
                                        : fields[i].dataType().typeName();
                        if (i == 0) {
                                str = str + fields[i].name() + " " + datatype + 
" PRIMARY KEY, ";
                        } else if (i == fields.length - 1) {
                                str = str + fields[i].name() + " " + datatype;
                        } else {
                                str = str + fields[i].name() + " " + datatype + 
" ,";
                        }
                }

                return str;
        }

        private static void InsertingData(Dataset<Row> employee, IgniteCache 
cache,
SqlFieldsQuery employeeinsert) {

                List<Row> collectAsList = employee.collectAsList();

                for (int i = 0; i < collectAsList.size(); i++) {
                        Row row = collectAsList.get(i);
                        Object[] objectarray = new Object[row.size()];

                        for (int j = 0; j < objectarray.length; j++) {
                                objectarray[j] = row.get(j);
                        }

                
cache.query(employeeinsert.setArgs(objectarray).setSchema("PUBLIC")).getAll();

                }

        }

        private static SqlFieldsQuery creatingInsertingCommand(Dataset<Row>
employee, IgniteCache cache, String tempTable) {

                String query = gettingInsertQuery(employee);

                String fields = 
Arrays.toString(employee.columns()).replaceAll("\\[",
"").replaceAll("\\]", "");

                String str = "INSERT INTO " + tempTable + " (" + fields + ")  
VALUES (" +
query + ")";

                SqlFieldsQuery city = new SqlFieldsQuery(str);
                System.out.println("str-------------->" + str);
                return city;
        }

        private static String gettingInsertQuery(Dataset<Row> employee) {

                String str = "";

                for (int i = 0; i < employee.columns().length; i++) {
                        if (i != employee.columns().length - 1) {
                                str = str + "?, ";
                        } else {
                                str = str + "?";
                        }

                }

                return str;
        }
}




--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Reply via email to