package com.test;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spark.IgniteContext;
import org.apache.ignite.spark.IgniteDataFrameSettings;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import
org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.ignite.IgniteSparkSession;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.inn.sparkrunner.context.JobContext;
import com.inn.sparkrunner.context.JobContextImpl;
import com.inn.sparkrunner.createsparkcontext.CreateSparkContext;
import com.inn.sparkrunner.util.ConfigUtil;
public class JoinUsingIgnite {
public static void main(String[] args) throws SecurityException,
Exception
{
Logger logger = LoggerFactory.getLogger(JoinUsingIgnite.class);
ConfigUtil.setConfig();
System.setProperty("IGNITE_HOME",
"/opt/kylo/testignite/apache-ignite-fabric-2.5.0-bin");
JobContext jobContext = null;
SparkSession sparkSession = null;
sparkSession = CreateSparkContext.create(args);
jobContext = new JobContextImpl(sparkSession);
TcpDiscoverySpi spi = new TcpDiscoverySpi();
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
ipFinder.setAddresses(java.util.Arrays.asList("sf3.start.com",
"sf2.start.com"));
spi.setIpFinder(ipFinder);
IgniteConfiguration cfg = new IgniteConfiguration();
// cfg.setIgniteInstanceName("grid");
cfg.setDiscoverySpi(spi);
Ignite ignite = Ignition.start(cfg);
IgniteContext igniteContext = new
IgniteContext(sparkSession.sparkContext(),
"/opt/kylo/testignite/apache-ignite-fabric-2.5.0-bin/config/default-config.xml");
IgniteSparkSession igniteSparkSession = new
IgniteSparkSession(igniteContext, sparkSession);
Properties connectionproperties = new Properties();
connectionproperties.put("user", "FORESIGHT_PRODUCT");
connectionproperties.put("password", "FORESIGHT_PRODUCT@#^249");
connectionproperties.setProperty("Driver",
"com.mysql.jdbc.Driver");
logger.info("spark start------");
Dataset<Row> jdbc =
igniteSparkSession.read().option("inferschema",
"false").jdbc(
"jdbc:mysql://192.168.4.249:3306/FORESIGHT",
"(SELECT * FROM
NetworkElement) emp",
connectionproperties);
Dataset<Row> jdbc1 =
igniteSparkSession.read().option("inferschema",
"false").jdbc(
"jdbc:mysql://192.168.4.249:3306/FORESIGHT",
"(SELECT * FROM
MacroSiteDetail) emp1",
connectionproperties);
CacheConfiguration setSqlSchema = new
CacheConfiguration("cachecfg")
.setSqlSchema("PUBLIC");
IgniteCache cache = ignite.getOrCreateCache(setSqlSchema);
CacheConfiguration setSqlSchema2 = new
CacheConfiguration("table2")
.setSqlSchema("PUBLIC");
IgniteCache cache2 = ignite.getOrCreateCache(setSqlSchema2);
creatingTable(jdbc, cache, "ignitetab1", "networkelementid_pk");
SqlFieldsQuery creatingInsertingCommand =
creatingInsertingCommand(jdbc,
cache, "ignitetab1");
InsertingData(jdbc, cache, creatingInsertingCommand);
creatingTable(jdbc1, cache2, "ignitetab2",
"macrositedetailid_pk");
SqlFieldsQuery creatingInsertingCommand1 =
creatingInsertingCommand(jdbc1,
cache2, "ignitetab2");
InsertingData(jdbc1, cache2, creatingInsertingCommand1);
Dataset<Row> df1 =
igniteSparkSession.read().format(IgniteDataFrameSettings.FORMAT_IGNITE())
.option(IgniteDataFrameSettings.OPTION_TABLE(),
"ignitetab1")
.option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(),
"/opt/kylo/testignite/apache-ignite-fabric-2.5.0-bin/config/default-config.xml")
.load().repartition(1);
df1.createOrReplaceTempView("df1");
Dataset<Row> df2 =
igniteSparkSession.read().format(IgniteDataFrameSettings.FORMAT_IGNITE())
.option(IgniteDataFrameSettings.OPTION_TABLE(),
"ignitetab2")
.option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(),
"/opt/kylo/testignite/apache-ignite-fabric-2.5.0-bin/config/default-config.xml")
.load().repartition(1);
df2.createOrReplaceTempView("df2");
logger.info("join query");
Dataset<Row> ignite1 = igniteSparkSession.sql(
"select * from df1 join df2 on
df1.networkelementid_pk=df2.macrositedetailid_pk");
logger.info("join query end");
logger.info("ignite dataframe count------[{}]",ignite1.count());
igniteSparkSession.close();
Ignition.stop(true);
}
private static void creatingTable(Dataset<Row> employee, IgniteCache
cache,
String tempTable, String index) {
String query = gettingCreateQuery(employee);
String str = "CREATE TABLE " + tempTable + " (" + query + ")
WITH
\"template=partitioned\"";
System.out.println("create query--->" + str);
cache.query(new
SqlFieldsQuery(str).setSchema("PUBLIC")).getAll();
cache.query(new SqlFieldsQuery("CREATE INDEX on " + tempTable +
" (" +
index + ")")).getAll();
}
private static String gettingCreateQuery(Dataset<Row> employee) {
String str = "";
StructField[] fields = employee.schema().fields();
for (int i = 0; i < fields.length; i++) {
String datatype =
fields[i].dataType().typeName().equalsIgnoreCase("string") ? "VARCHAR"
: fields[i].dataType().typeName();
if (i == 0) {
str = str + fields[i].name() + " " + datatype +
" PRIMARY KEY, ";
} else if (i == fields.length - 1) {
str = str + fields[i].name() + " " + datatype;
} else {
str = str + fields[i].name() + " " + datatype +
" ,";
}
}
return str;
}
private static void InsertingData(Dataset<Row> employee, IgniteCache
cache,
SqlFieldsQuery employeeinsert) {
List<Row> collectAsList = employee.collectAsList();
for (int i = 0; i < collectAsList.size(); i++) {
Row row = collectAsList.get(i);
Object[] objectarray = new Object[row.size()];
for (int j = 0; j < objectarray.length; j++) {
objectarray[j] = row.get(j);
}
cache.query(employeeinsert.setArgs(objectarray).setSchema("PUBLIC")).getAll();
}
}
private static SqlFieldsQuery creatingInsertingCommand(Dataset<Row>
employee, IgniteCache cache, String tempTable) {
String query = gettingInsertQuery(employee);
String fields =
Arrays.toString(employee.columns()).replaceAll("\\[",
"").replaceAll("\\]", "");
String str = "INSERT INTO " + tempTable + " (" + fields + ")
VALUES (" +
query + ")";
SqlFieldsQuery city = new SqlFieldsQuery(str);
System.out.println("str-------------->" + str);
return city;
}
private static String gettingInsertQuery(Dataset<Row> employee) {
String str = "";
for (int i = 0; i < employee.columns().length; i++) {
if (i != employee.columns().length - 1) {
str = str + "?, ";
} else {
str = str + "?";
}
}
return str;
}
}
--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/