Hello!

Apache Ignite SQL should be accessed by Ignite JDBC Thin driver. This is
the preferred way.

JDBC Thin driver also has streaming mode in the form of SET STREAMING
ON/OFF.

Please see attached file where I have introduced "thin" mode.

As for client mode streaming not working, I will look into it further.

Regards,
-- 
Ilya Kasnacheev


пн, 3 дек. 2018 г. в 01:13, joseheitor <[email protected]>:

> Hi Ilya,
>
> Any update on your investigation of this issue...?
>
> Your comments that 'streaming mode' in Client driver and Client driver
> itself are near-deprecated - are very surprising and concerning!
>
> 1. Are you saying that Apache Ignite SQL will seize to be accessible via
> standard JDBC?
>
> 2. If 'streaming mode' is to be deprecated - will there be an alternative
> method of inserting high-throughput data via SQL?
>
> Thanks,
> Jose
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>
package com.example.ignite;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Date;
import java.util.Random;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class IgniteApplication
{
    public static String config;
    public static boolean stream = false;
    public static boolean thin = false;
    
    public static void main(String[] args)
    {
        SpringApplication.run(IgniteApplication.class, args);
        try {
            if (args.length < 1) {
                System.out.println("Usage: java -jar ignite-1.jar <path/to/config.xml> [<stream>]");
                System.exit(1);
            } else {
                config = args[0];
                if (args.length > 1)
                    if (args[1].equalsIgnoreCase("stream"))
                        stream = true;
                    else if (args[1].equalsIgnoreCase("thin"))
                        thin = true;
            }
            initSQLDatabase();
            importSQLData();
            querySQLDatabase();
        }
        catch (Exception e) {
            
            e.printStackTrace(System.err);
        }
    }
    
    private static void initSQLDatabase() throws SQLException {
        
        Connection dbConnection = null;
        Statement statement = null;
        String url = "jdbc:ignite:cfg://cache=DATASTORE:distributedJoins=true:transactionsAllowed=true:multipleStatementsAllowed=true@file://" + config;
        String cmd = "DROP TABLE IF EXISTS public.transactions;" +
                     "DROP INDEX IF EXISTS transactions_id_k_v;" +
                     "DROP INDEX IF EXISTS transactions_k_id_v;" +
                     "DROP INDEX IF EXISTS transactions_k_v_id;" +
                     "DROP INDEX IF EXISTS transactions_pk;" +
                     "CREATE TABLE public.transactions (pk INT, id INT, k VARCHAR, v VARCHAR, PRIMARY KEY (pk, id))" +
                     " WITH \"TEMPLATE=PARTITIONED, BACKUPS=1, ATOMICITY=TRANSACTIONAL, WRITE_SYNCHRONIZATION_MODE=FULL_SYNC, AFFINITY_KEY=id\";" +
                     "CREATE INDEX transactions_id_k_v ON public.transactions (id, k, v);" +
                     "CREATE INDEX transactions_k_id_v ON public.transactions (k, id, v);" +
                     "CREATE INDEX transactions_k_v_id ON public.transactions (k, v, id);" +
                     "CREATE INDEX transactions_pk ON public.transactions (pk);";
        try {
            Class.forName("org.apache.ignite.IgniteJdbcDriver");
            dbConnection = DriverManager.getConnection(url, "", "");
            dbConnection.setSchema("PUBLIC");
            
            statement = dbConnection.createStatement();
            System.out.print("Initializing database...");
            statement.execute(cmd);
            System.out.print("Done\n");
        }
        catch (Exception e) {
            
            e.printStackTrace(System.err);
            
        } finally {

            if (statement != null) {
                statement.close();
            }

            if (dbConnection != null) {
                dbConnection.close();
            }
        }
    }
    
    private static void importSQLData() throws SQLException {

        Connection dbConnection = null;
        PreparedStatement statement = null;
        Random randomGenerator = new Random();
        
        String url;
        if (stream)
            url = "jdbc:ignite:cfg://cache=DATASTORE:streaming=true:streamingFlushFrequency=1000@file://" + config;
        else if (thin)
            url = "jdbc:ignite:thin://192.168.1.230,192.168.1.220";
        else
            url = "jdbc:ignite:cfg://cache=DATASTORE@file://" + config;

        String insertTableSQL = "INSERT INTO transactions (pk, id, k, v) VALUES (?, ?, ?, ?)";
        
        String initials[] = {"A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", 
                             "S", "T", "U", "V", "W", "X", "Y", "Z"
        };
        String[] surnames = {"Sindgon", "Costa", "Carvalho", "Carter", "Smythe", "McPhearson", "Buffet", "Newman"
        };
        String[] emails = {"[email protected]", "[email protected]", "[email protected]", "[email protected]"
        };
        String[] currencies = {"ZAR", "CZK", "CNY", "IDR", "AFN", "EUR", "RUB", "MAD"
        };

        try {
            Class.forName("org.apache.ignite.IgniteJdbcDriver");
            dbConnection = DriverManager.getConnection(url, "", "");
            dbConnection.setSchema("PUBLIC");
            if (thin)
                dbConnection.createStatement().execute("SET STREAMING ON");
            
            statement = dbConnection.prepareStatement(insertTableSQL);
            System.out.println("Starting data import... " + new Date());
            int pk = 0;
            for (int id = 0; id < 1000; id++) {

                System.out.print("[" + id + "] ");
                String initial = initials[randomGenerator.nextInt(initials.length)];
                String surname = surnames[randomGenerator.nextInt(surnames.length)];
                String email = surnames[randomGenerator.nextInt(emails.length)];
                String currency = surnames[randomGenerator.nextInt(currencies.length)];

                statement.setInt(1, ++pk);
                statement.setInt(2, id);
                statement.setString(3, "trans.cust.first_name");
                statement.setString(4, initial);
                statement.execute();
                System.out.print(".");

                statement.setInt(1, ++pk);
                statement.setInt(2, id);
                statement.setString(3, "trans.cust.last_name");
                statement.setString(4, surname);
                statement.execute();
                System.out.print(".");

                statement.setInt(1, ++pk);
                statement.setInt(2, id);
                statement.setString(3, "trans.cust.email");
                statement.setString(4, email);
                statement.execute();
                System.out.print(".");

                statement.setInt(1, ++pk);
                statement.setInt(2, id);
                statement.setString(3, "trans.cust.gender");
                statement.setString(4, id % 2 == 0 ? "Male" : "Female");
                statement.execute();
                System.out.print(".");

                statement.setInt(1, ++pk);
                statement.setInt(2, id);
                statement.setString(3, "trans.date");
                statement.setString(4, "201" + id % 10 + "-0" + pk % 10 + "1" + randomGenerator.nextInt(9));
                statement.execute();
                System.out.print(".");

                statement.setInt(1, ++pk);
                statement.setInt(2, id);
                statement.setString(3, "trans.amount");
                statement.setString(4, String.valueOf(pk * (id + 1) * (randomGenerator.nextInt(15) + 1)));
                statement.execute();
                System.out.print(".");

                statement.setInt(1, ++pk);
                statement.setInt(2, id);
                statement.setString(3, "trans.currency");
                statement.setString(4, currency);
                statement.execute();
                System.out.print(".\n");
            }
            if (thin)
                dbConnection.createStatement().execute("SET STREAMING OFF");
            System.out.println("Data import complete. " + new Date());
            
        } catch (Exception e) {

            e.printStackTrace(System.err);

        } finally {

            if (statement != null) {
                statement.close();
            }

            if (dbConnection != null) {
                dbConnection.close();
            }
        }
    }
    
    private static void querySQLDatabase() throws SQLException {
        
        Connection dbConnection = null;
        Statement statement = null;
        String url = "jdbc:ignite:cfg://cache=DATASTORE:distributedJoins=true:transactionsAllowed=true:multipleStatementsAllowed=true@file://" + config;
        String cmd = "SELECT COUNT(*) AS total FROM public.transactions";
        try {
            Class.forName("org.apache.ignite.IgniteJdbcDriver");
            dbConnection = DriverManager.getConnection(url, "", "");
            dbConnection.setSchema("PUBLIC");
            
            statement = dbConnection.createStatement();
            ResultSet rs = statement.executeQuery(cmd);
            while(rs.next())
                System.out.println(" ------------------------------ " +
                        " --->>>  Records in database: " + rs.getLong("total") +
                        " ------------------------------ ");
        }
        catch (Exception e) {
            
            e.printStackTrace(System.err);
            
        } finally {

            if (statement != null) {
                statement.close();
            }

            if (dbConnection != null) {
                dbConnection.close();
            }
        }
    }
}

Reply via email to