Hello!
Apache Ignite SQL should be accessed by Ignite JDBC Thin driver. This is
the preferred way.
JDBC Thin driver also has streaming mode in the form of SET STREAMING
ON/OFF.
Please see attached file where I have introduced "thin" mode.
As for client mode streaming not working, I will look into it further.
Regards,
--
Ilya Kasnacheev
пн, 3 дек. 2018 г. в 01:13, joseheitor <[email protected]>:
> Hi Ilya,
>
> Any update on your investigation of this issue...?
>
> Your comments that 'streaming mode' in Client driver and Client driver
> itself are near-deprecated - are very surprising and concerning!
>
> 1. Are you saying that Apache Ignite SQL will seize to be accessible via
> standard JDBC?
>
> 2. If 'streaming mode' is to be deprecated - will there be an alternative
> method of inserting high-throughput data via SQL?
>
> Thanks,
> Jose
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>
package com.example.ignite;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Date;
import java.util.Random;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class IgniteApplication
{
public static String config;
public static boolean stream = false;
public static boolean thin = false;
public static void main(String[] args)
{
SpringApplication.run(IgniteApplication.class, args);
try {
if (args.length < 1) {
System.out.println("Usage: java -jar ignite-1.jar <path/to/config.xml> [<stream>]");
System.exit(1);
} else {
config = args[0];
if (args.length > 1)
if (args[1].equalsIgnoreCase("stream"))
stream = true;
else if (args[1].equalsIgnoreCase("thin"))
thin = true;
}
initSQLDatabase();
importSQLData();
querySQLDatabase();
}
catch (Exception e) {
e.printStackTrace(System.err);
}
}
private static void initSQLDatabase() throws SQLException {
Connection dbConnection = null;
Statement statement = null;
String url = "jdbc:ignite:cfg://cache=DATASTORE:distributedJoins=true:transactionsAllowed=true:multipleStatementsAllowed=true@file://" + config;
String cmd = "DROP TABLE IF EXISTS public.transactions;" +
"DROP INDEX IF EXISTS transactions_id_k_v;" +
"DROP INDEX IF EXISTS transactions_k_id_v;" +
"DROP INDEX IF EXISTS transactions_k_v_id;" +
"DROP INDEX IF EXISTS transactions_pk;" +
"CREATE TABLE public.transactions (pk INT, id INT, k VARCHAR, v VARCHAR, PRIMARY KEY (pk, id))" +
" WITH \"TEMPLATE=PARTITIONED, BACKUPS=1, ATOMICITY=TRANSACTIONAL, WRITE_SYNCHRONIZATION_MODE=FULL_SYNC, AFFINITY_KEY=id\";" +
"CREATE INDEX transactions_id_k_v ON public.transactions (id, k, v);" +
"CREATE INDEX transactions_k_id_v ON public.transactions (k, id, v);" +
"CREATE INDEX transactions_k_v_id ON public.transactions (k, v, id);" +
"CREATE INDEX transactions_pk ON public.transactions (pk);";
try {
Class.forName("org.apache.ignite.IgniteJdbcDriver");
dbConnection = DriverManager.getConnection(url, "", "");
dbConnection.setSchema("PUBLIC");
statement = dbConnection.createStatement();
System.out.print("Initializing database...");
statement.execute(cmd);
System.out.print("Done\n");
}
catch (Exception e) {
e.printStackTrace(System.err);
} finally {
if (statement != null) {
statement.close();
}
if (dbConnection != null) {
dbConnection.close();
}
}
}
private static void importSQLData() throws SQLException {
Connection dbConnection = null;
PreparedStatement statement = null;
Random randomGenerator = new Random();
String url;
if (stream)
url = "jdbc:ignite:cfg://cache=DATASTORE:streaming=true:streamingFlushFrequency=1000@file://" + config;
else if (thin)
url = "jdbc:ignite:thin://192.168.1.230,192.168.1.220";
else
url = "jdbc:ignite:cfg://cache=DATASTORE@file://" + config;
String insertTableSQL = "INSERT INTO transactions (pk, id, k, v) VALUES (?, ?, ?, ?)";
String initials[] = {"A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R",
"S", "T", "U", "V", "W", "X", "Y", "Z"
};
String[] surnames = {"Sindgon", "Costa", "Carvalho", "Carter", "Smythe", "McPhearson", "Buffet", "Newman"
};
String[] emails = {"[email protected]", "[email protected]", "[email protected]", "[email protected]"
};
String[] currencies = {"ZAR", "CZK", "CNY", "IDR", "AFN", "EUR", "RUB", "MAD"
};
try {
Class.forName("org.apache.ignite.IgniteJdbcDriver");
dbConnection = DriverManager.getConnection(url, "", "");
dbConnection.setSchema("PUBLIC");
if (thin)
dbConnection.createStatement().execute("SET STREAMING ON");
statement = dbConnection.prepareStatement(insertTableSQL);
System.out.println("Starting data import... " + new Date());
int pk = 0;
for (int id = 0; id < 1000; id++) {
System.out.print("[" + id + "] ");
String initial = initials[randomGenerator.nextInt(initials.length)];
String surname = surnames[randomGenerator.nextInt(surnames.length)];
String email = surnames[randomGenerator.nextInt(emails.length)];
String currency = surnames[randomGenerator.nextInt(currencies.length)];
statement.setInt(1, ++pk);
statement.setInt(2, id);
statement.setString(3, "trans.cust.first_name");
statement.setString(4, initial);
statement.execute();
System.out.print(".");
statement.setInt(1, ++pk);
statement.setInt(2, id);
statement.setString(3, "trans.cust.last_name");
statement.setString(4, surname);
statement.execute();
System.out.print(".");
statement.setInt(1, ++pk);
statement.setInt(2, id);
statement.setString(3, "trans.cust.email");
statement.setString(4, email);
statement.execute();
System.out.print(".");
statement.setInt(1, ++pk);
statement.setInt(2, id);
statement.setString(3, "trans.cust.gender");
statement.setString(4, id % 2 == 0 ? "Male" : "Female");
statement.execute();
System.out.print(".");
statement.setInt(1, ++pk);
statement.setInt(2, id);
statement.setString(3, "trans.date");
statement.setString(4, "201" + id % 10 + "-0" + pk % 10 + "1" + randomGenerator.nextInt(9));
statement.execute();
System.out.print(".");
statement.setInt(1, ++pk);
statement.setInt(2, id);
statement.setString(3, "trans.amount");
statement.setString(4, String.valueOf(pk * (id + 1) * (randomGenerator.nextInt(15) + 1)));
statement.execute();
System.out.print(".");
statement.setInt(1, ++pk);
statement.setInt(2, id);
statement.setString(3, "trans.currency");
statement.setString(4, currency);
statement.execute();
System.out.print(".\n");
}
if (thin)
dbConnection.createStatement().execute("SET STREAMING OFF");
System.out.println("Data import complete. " + new Date());
} catch (Exception e) {
e.printStackTrace(System.err);
} finally {
if (statement != null) {
statement.close();
}
if (dbConnection != null) {
dbConnection.close();
}
}
}
private static void querySQLDatabase() throws SQLException {
Connection dbConnection = null;
Statement statement = null;
String url = "jdbc:ignite:cfg://cache=DATASTORE:distributedJoins=true:transactionsAllowed=true:multipleStatementsAllowed=true@file://" + config;
String cmd = "SELECT COUNT(*) AS total FROM public.transactions";
try {
Class.forName("org.apache.ignite.IgniteJdbcDriver");
dbConnection = DriverManager.getConnection(url, "", "");
dbConnection.setSchema("PUBLIC");
statement = dbConnection.createStatement();
ResultSet rs = statement.executeQuery(cmd);
while(rs.next())
System.out.println(" ------------------------------ " +
" --->>> Records in database: " + rs.getLong("total") +
" ------------------------------ ");
}
catch (Exception e) {
e.printStackTrace(System.err);
} finally {
if (statement != null) {
statement.close();
}
if (dbConnection != null) {
dbConnection.close();
}
}
}
}