/**********************************************************************
* FILENAME :        odbcsample.cc
*
* DESCRIPTION :
*       Simple SQL SERVER example to create a basic table with an
*       identity field
*
* ODBC USAGE :
* 		Prompts for table name
* 		SQLExecDirect - to execute CREATE TABLE statement
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <sql.h>
#include <sqlext.h>
#include <string.h>
#include <string>
#include <vector>
//#include "util.c"

#define NAME_LEN 64
#define STMT_LEN 128

#define SQL_TYPE_VARCHAR	12
#define SQL_TYPE_DOUBLE		8
#define SQL_TYPE_LONG		-5

enum { ODBC_BUFFER_SIZE = 1024 };

#define CHECK_ERROR(e, s, h, t) ({\
            if (e!=SQL_SUCCESS && e != SQL_SUCCESS_WITH_INFO) {extract_error(s, h, t); return 1;} \
})

int insert_handle;
int select_handle;
int update_handle;
int delete_handle;

SQLHENV  henv  = SQL_NULL_HENV;   	// Environment
SQLHDBC  hdbc  = SQL_NULL_HDBC;   	// Connection handle
SQLHSTMT hstmt = SQL_NULL_HSTMT;  	// Statement handle
SQLRETURN retcode;
SQLLEN g_lnLen;

struct OdbcStringBuffer
{
	SQLCHAR buffer[ODBC_BUFFER_SIZE];
	SQLLEN reallen;
};

struct param {
	param(){}

	~param(){}
	
	union uval{
		double m_double;
		long m_long;
		char m_arr_char[ODBC_BUFFER_SIZE];	
	};

	int m_dataType;
	uval m_value;
};

struct IGNHandle {
	IGNHandle() {}

	~IGNHandle() {
		SQLFreeHandle(SQL_HANDLE_STMT, stmt);
	}
	
	SQLHSTMT stmt;
	std::vector<param*> m_params;
	char m_ret[ODBC_BUFFER_SIZE];
};

struct record {
	record() {}

	~record() {
	}
	
	char mkey[25];
	char partial[25];
	char num[25];
};

std::vector<IGNHandle*> mHandlesIgnite;	
std::vector<record*> mVecMockRecord;	


void extract_error(char *fn, SQLHANDLE handle, SQLSMALLINT type)
{
    SQLINTEGER i = 0;
    SQLINTEGER NativeError;
    SQLCHAR SQLState[ 7 ];
    SQLCHAR MessageText[256];
    SQLSMALLINT TextLength;
    SQLRETURN ret;

    fprintf(stderr, "\nThe driver reported the following error %s\n", fn);
    do
    {
        ret = SQLGetDiagRec(type, handle, ++i, SQLState, &NativeError,
                            MessageText, sizeof(MessageText), &TextLength);
        if (SQL_SUCCEEDED(ret)) {
            printf("%s:%ld:%ld:%s\n",
                        SQLState, (long) i, (long) NativeError, MessageText);
        }
    }
    while( ret == SQL_SUCCESS );
}

int sql_prepare(const char *sqlStmt)
{
    //printf("Prepare statement '%s'", sqlStmt);

	IGNHandle *p_hdl = new IGNHandle();

    retcode = SQLAllocHandle(SQL_HANDLE_STMT, hdbc, &(p_hdl->stmt));
	CHECK_ERROR(retcode, "Fail to allocate stmt", p_hdl->stmt, SQL_HANDLE_STMT);

    retcode = SQLPrepare(p_hdl->stmt, reinterpret_cast<SQLCHAR*>(const_cast<char*>(sqlStmt)), SQL_NTS);
	CHECK_ERROR(retcode, "Fail to prepare stmt", p_hdl->stmt, SQL_HANDLE_STMT);

	SQLSMALLINT NumParams;

    retcode = SQLNumParams(p_hdl->stmt, &NumParams);
	CHECK_ERROR(retcode, "Fail to get param num", p_hdl->stmt, SQL_HANDLE_STMT);
    
	SQLSMALLINT	DataType, DecimalDigits, Nullable;
	SQLULEN		bytesRemaining;	
	g_lnLen = SQL_NTS;


	for (int i = 1; i <= NumParams; i++) {  
		retcode = SQLDescribeParam(p_hdl->stmt,
								   i,
								   &DataType,
								   &bytesRemaining,
								   &DecimalDigits,
								   &Nullable);
		CHECK_ERROR(retcode, "Fail to describe param", p_hdl->stmt, SQL_HANDLE_STMT);
		
		//printf("param_no: %d, Data Type : %i, bytesRemaining : %i, DecimalDigits : %i, Nullable %i\n", i,
		//						(int)DataType, (int)bytesRemaining,
		//						(int)DecimalDigits, (int)Nullable);
		
		param *p_param = new param();
		
		int dtype = (int)DataType;
		p_param->m_dataType = dtype;

		if(dtype == SQL_TYPE_VARCHAR) {
			//printf("set SQL_TYPE_VARCHAR\n");
			retcode = SQLBindParameter(p_hdl->stmt
									, i
									, SQL_PARAM_INPUT
									, SQL_C_CHAR
									, SQL_VARCHAR
									, ODBC_BUFFER_SIZE
									, ODBC_BUFFER_SIZE
									, p_param->m_value.m_arr_char
									, 0
									, &g_lnLen);
		} else if(dtype == SQL_TYPE_DOUBLE) {
			//printf("set SQL_TYPE_DOUBLE\n");
			retcode = SQLBindParameter(p_hdl->stmt
									, i
									, SQL_PARAM_INPUT
									, SQL_C_DOUBLE
									, SQL_DOUBLE
									, 0
									, 0
									, &(p_param->m_value.m_double)
									, 0
									, 0);
		} else if(dtype == SQL_TYPE_LONG) {
			//printf("set SQL_TYPE_LONG\n");
			retcode = SQLBindParameter(p_hdl->stmt
									, i
									, SQL_PARAM_INPUT
									, SQL_C_SLONG
									, SQL_BIGINT
									, 0
									, 0
									, &(p_param->m_value.m_long)
									, 0
									, 0);
		}
		CHECK_ERROR(retcode, "Fail to bind param", p_hdl->stmt, SQL_HANDLE_STMT);

		(p_hdl->m_params).push_back(p_param);
	}

	mHandlesIgnite.push_back(p_hdl);

    //printf("Allocated handle number %d for the ignite statement.", mHandlesIgnite.size()-1);
    return mHandlesIgnite.size()-1;
}

int sql_set_parameter(int handle, int param_no, const char *value)
{
	IGNHandle *p_hdl = mHandlesIgnite[handle];

	int type = (*((p_hdl->m_params)[param_no])).m_dataType;
	if(type == SQL_TYPE_VARCHAR) {
		//printf("set SQL_TYPE_VARCHAR with value: %s\n", value);		
		strcpy((*((p_hdl->m_params)[param_no])).m_value.m_arr_char, value);
	} else if(type == SQL_TYPE_DOUBLE) {
		double d = atof(value);
		//printf("set SQL_TYPE_DOUBLE with value: %f\n", d);
		(*((p_hdl->m_params)[param_no])).m_value.m_double = d;
	} else if(type == SQL_TYPE_LONG) {
		long l = atol(value);
		//printf("set SQL_TYPE_LONG with value: %ld\n", l);
		(*((p_hdl->m_params)[param_no])).m_value.m_long = l;
	}
    return 0;
}

int sql_execute(int handle)
{
	IGNHandle *p_hdl = mHandlesIgnite[handle];
	
	retcode = SQLExecute(p_hdl->stmt);
	CHECK_ERROR(retcode, "Fail to execute handle", p_hdl->stmt, SQL_HANDLE_STMT);

    return 0;
}

int sql_fetch(int handle)
{
	IGNHandle *p_hdl = mHandlesIgnite[handle];
	retcode = SQLFetch(p_hdl->stmt);
	CHECK_ERROR(retcode, "Fail to fetch record", p_hdl->stmt, SQL_HANDLE_STMT);

    return 0;
}

char* sql_get_value(int handle, int value_no)
{
	IGNHandle *p_hdl = mHandlesIgnite[handle];

	OdbcStringBuffer columns;
	SQLBindCol(p_hdl->stmt, value_no, SQL_CHAR, columns.buffer, ODBC_BUFFER_SIZE, &columns.reallen);

	sprintf(p_hdl->m_ret, "%s", columns.buffer);
	return &((p_hdl->m_ret)[0]);
}

int sql_commit()
{
	retcode = SQLEndTran(SQL_HANDLE_DBC, hdbc, SQL_COMMIT);
	CHECK_ERROR(retcode, "Fail to commit", hdbc, SQL_HANDLE_DBC);
    return 0;
}

int sql_close(int handle, bool unset_error)
{
	IGNHandle *p_hdl = mHandlesIgnite[handle];
	retcode = SQLFreeStmt(p_hdl->stmt, SQL_CLOSE);
	CHECK_ERROR(retcode, "Fail to close handle", p_hdl->stmt, SQL_HANDLE_STMT);

    return 0;
}

int main(int argc, char *argv[]) {

	long numpartial = atol(argv[1]);
	long numcompleterec = atol(argv[2]);

	// Allocate an environment handle
	retcode=SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &henv);
	CHECK_ERROR(retcode, "SQLAllocHandle(SQL_HANDLE_ENV)",
                    henv, SQL_HANDLE_ENV);

	// We want ODBC 3 support
	retcode = SQLSetEnvAttr(henv, SQL_ATTR_ODBC_VERSION,
                            (void *) SQL_OV_ODBC3, 0);
	CHECK_ERROR(retcode, "SQLSetEnvAttr(SQL_ATTR_ODBC_VERSION)",
                    hdbc, SQL_HANDLE_DBC);

	// Allocate a connection handle
	retcode = SQLAllocHandle(SQL_HANDLE_DBC, henv, &hdbc);
	CHECK_ERROR(retcode, "SQLAllocHandle", hdbc, SQL_HANDLE_DBC);

	// Connect to the DSN
	//retcode=SQLDriverConnect(hdbc, NULL, "DSN=DATASOURCE;", SQL_NTS,
    //                     NULL, 0, NULL, SQL_DRIVER_COMPLETE);

	std::string connectStr = "DRIVER={Apache Ignite};SERVER=localhost;PORT=10800;SCHEMA=PUBLIC;";
	SQLCHAR outstr[1024];
	SQLSMALLINT outstrlen;

	retcode = SQLDriverConnect(hdbc, NULL, reinterpret_cast<SQLCHAR*>(&connectStr[0]),
		static_cast<SQLSMALLINT>(connectStr.size()), outstr, sizeof(outstr), &outstrlen, SQL_DRIVER_COMPLETE);
	CHECK_ERROR(retcode, "SQLDriverConnect(DRIVER={Apache Ignite};SERVER=localhost;PORT=10800;SCHEMA=PUBLIC;)",
                    hdbc, SQL_HANDLE_DBC);

	retcode = SQLSetConnectAttr(hdbc, SQL_ATTR_AUTOCOMMIT, SQL_AUTOCOMMIT_OFF, 0);
	CHECK_ERROR(retcode, "Failed to set manual commit", hdbc, SQL_HANDLE_DBC);

	// Allocate a statement handle
	retcode=SQLAllocHandle(SQL_HANDLE_STMT, hdbc, &hstmt);
	CHECK_ERROR(retcode, "SQLAllocHandle(STMT)", hstmt, SQL_HANDLE_STMT);

    //printf ("\nConnection Complete.\n");

    const char* insert_sql = "insert into CDRTEST values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
    const char* update_sql = "update CDRTEST set value01=?,value02=?,value03=?,value04=?,value05=?,value21=?,value22=?,value23=?,value24=?,value25=? where key01=?";
    const char* delete_sql = "delete from CDRTEST where key01=?";
    const char* select_sql = "select value01 from CDRTEST USE INDEX(key01t_idx) where key01=?";

    retcode = SQLExecDirect(hstmt, reinterpret_cast<SQLCHAR*>(const_cast<char*>("DROP TABLE IF EXISTS CDRTEST")), SQL_NTS);
            CHECK_ERROR(retcode, "Fail to drop table", hstmt, SQL_HANDLE_STMT);

    retcode = SQLExecDirect(hstmt, reinterpret_cast<SQLCHAR*>(const_cast<char*>("DROP INDEX IF EXISTS key01t_idx")), SQL_NTS);
            CHECK_ERROR(retcode, "Fail to drop index", hstmt, SQL_HANDLE_STMT);

    retcode = SQLExecDirect(hstmt, reinterpret_cast<SQLCHAR*>(const_cast<char*>("CREATE TABLE IF NOT EXISTS CDRTEST ( "
			"key01 VARCHAR PRIMARY KEY, "
			"value01 LONG, "
			"value02 LONG, "
			"value03 LONG, "
			"value04 LONG, "
			"value05 LONG, "
			"value06 LONG, "
			"value07 LONG, "
			"value08 LONG, "
			"value09 LONG, "
			"value10 LONG, "
			"value11 LONG, "
			"value12 LONG, "
			"value13 LONG, "
			"value14 LONG, "
			"value15 LONG, "
			"value16 LONG, "
			"value17 LONG, "
			"value18 LONG, "
			"value19 LONG, "
			"value20 LONG, "			
			"value21 VARCHAR, "
			"value22 VARCHAR, "
			"value23 VARCHAR, "
			"value24 VARCHAR, "
			"value25 VARCHAR, "
			"value26 VARCHAR, "
			"value27 VARCHAR, "
			"value28 VARCHAR, "
			"value29 VARCHAR, "
			"value30 VARCHAR, "				
			"value31 VARCHAR, "
			"value32 VARCHAR, "
			"value33 VARCHAR, "
			"value34 VARCHAR, "
			"value35 VARCHAR, "
			"value36 VARCHAR, "
			"value37 VARCHAR, "
			"value38 VARCHAR, "
			"value39 VARCHAR) "
			"WITH \"template=partitioned,CACHE_NAME=cdrtest_cache,WRITE_SYNCHRONIZATION_MODE=FULL_ASYNC\"")), SQL_NTS);
            CHECK_ERROR(retcode, "Fail to create table", hstmt, SQL_HANDLE_STMT);

    retcode = SQLExecDirect(hstmt, reinterpret_cast<SQLCHAR*>(const_cast<char*>("CREATE INDEX key01t_idx ON CDRTEST(key01) INLINE_SIZE 64")), SQL_NTS);
            CHECK_ERROR(retcode, "Fail to create index", hstmt, SQL_HANDLE_STMT);

    retcode = SQLEndTran(SQL_HANDLE_DBC, hdbc, SQL_COMMIT);
            CHECK_ERROR(retcode, "Fail to commit table, index creation", hdbc, SQL_HANDLE_DBC);

	insert_handle = sql_prepare(insert_sql);
	select_handle = sql_prepare(select_sql);
	update_handle = sql_prepare(update_sql);
	delete_handle = sql_prepare(delete_sql);

	long totalrec = numpartial * numcompleterec;
	time_t start, end;
	double seconds;

	long keyindex = 1;
	long partialindex = 1;
	char keyarr[25];
	char partialarr[25];
	char numarr[] = "1";
	const char* key = &keyarr[0];
	const char* partial = &partialarr[0];
	const char* num = &numarr[0];

	time(&start);

	for (int i = 1; i <= totalrec - numcompleterec; i++) {
		if(i%numpartial == 1) {
			sprintf(keyarr, "KEY00001_%d", keyindex++);
			sprintf(partialarr, "PARTIAL001_%d", partialindex++);
			//printf("insert: %s\n", keyarr);
			sql_set_parameter(insert_handle, 0, key);
			sql_set_parameter(insert_handle, 1, num);
			sql_set_parameter(insert_handle, 2, num);
			sql_set_parameter(insert_handle, 3, num);
			sql_set_parameter(insert_handle, 4, num);
			sql_set_parameter(insert_handle, 5, num);
			sql_set_parameter(insert_handle, 6, num);
			sql_set_parameter(insert_handle, 7, num);
			sql_set_parameter(insert_handle, 8, num);
			sql_set_parameter(insert_handle, 9, num);
			sql_set_parameter(insert_handle, 10, num);
			sql_set_parameter(insert_handle, 11, num);
			sql_set_parameter(insert_handle, 12, num);
			sql_set_parameter(insert_handle, 13, num);
			sql_set_parameter(insert_handle, 14, num);
			sql_set_parameter(insert_handle, 15, num);
			sql_set_parameter(insert_handle, 16, num);
			sql_set_parameter(insert_handle, 17, num);
			sql_set_parameter(insert_handle, 18, num);
			sql_set_parameter(insert_handle, 19, num);
			sql_set_parameter(insert_handle, 20, num);
			sql_set_parameter(insert_handle, 21, partial);		 
			sql_set_parameter(insert_handle, 22, partial);
			sql_set_parameter(insert_handle, 23, partial);
			sql_set_parameter(insert_handle, 24, partial);
			sql_set_parameter(insert_handle, 25, partial);
			sql_set_parameter(insert_handle, 26, partial);
			sql_set_parameter(insert_handle, 27, partial);
			sql_set_parameter(insert_handle, 28, partial);
			sql_set_parameter(insert_handle, 29, partial);
			sql_set_parameter(insert_handle, 30, partial);		
			sql_set_parameter(insert_handle, 31, partial);		
			sql_set_parameter(insert_handle, 32, partial);
			sql_set_parameter(insert_handle, 33, partial);
			sql_set_parameter(insert_handle, 34, partial);
			sql_set_parameter(insert_handle, 35, partial);
			sql_set_parameter(insert_handle, 36, partial);
			sql_set_parameter(insert_handle, 37, partial);
			sql_set_parameter(insert_handle, 38, partial);
			sql_set_parameter(insert_handle, 39, partial);		 
			sql_execute(insert_handle);			
		} else if(i%numpartial == 0) {
			partialindex = 1;
			sql_commit();
		} else {
			//printf("update: %s\n", keyarr);
			//sprintf(partialarr, "PARTIAL001_%d", partialindex++);
			sql_set_parameter(select_handle, 0, key); //cost 500 TPS
			sql_execute(select_handle); //cost 2500 TPS
			sql_fetch(select_handle); //cost 300 TPS
			char tmp[25];
			sprintf(tmp, "%ld", atol(sql_get_value(select_handle, 1)) + 1);
			sql_close(select_handle, 0);
			//sql_set_parameter(update_handle, 0, tmp);
			//sql_set_parameter(update_handle, 1, tmp);
			//sql_set_parameter(update_handle, 2, tmp);
			//sql_set_parameter(update_handle, 3, tmp);
			//sql_set_parameter(update_handle, 4, tmp);
			sql_set_parameter(update_handle, 0, "2");
			sql_set_parameter(update_handle, 1, "2");
			sql_set_parameter(update_handle, 2, "2");
			sql_set_parameter(update_handle, 3, "2");
			sql_set_parameter(update_handle, 4, "2");
			sql_set_parameter(update_handle, 5, partial);		
			sql_set_parameter(update_handle, 6, partial);
			sql_set_parameter(update_handle, 7, partial);
			sql_set_parameter(update_handle, 8, partial);
			sql_set_parameter(update_handle, 9, partial);
			sql_set_parameter(update_handle, 10, key);
			sql_execute(update_handle);
		}
	}
	
	for (int i = 1; i <= numcompleterec; i++) {
		sprintf(keyarr, "KEY00001_%d", i);
		//printf("delete: %s\n", keyarr);
		sql_set_parameter(delete_handle, 0, key);
		//sql_execute(delete_handle);	
	}

	sql_commit();
	
	time(&end);
	seconds = difftime(end, start);
	printf ("===================================================\n");
	printf ("NumPartial: %ld\nNumCompleteRec: %ld\nTolRec: %ld\nDuration: %.f seconds\nTPS: %.f\n", numpartial, numcompleterec, totalrec, seconds, totalrec/seconds);
	printf ("===================================================\n");
	
    // Free handles
    // Statement
    if (hstmt != SQL_NULL_HSTMT)
        SQLFreeHandle(SQL_HANDLE_STMT, hstmt);

    // Connection
    if (hdbc != SQL_NULL_HDBC) {
        SQLDisconnect(hdbc);
        SQLFreeHandle(SQL_HANDLE_DBC, hdbc);
    }

    // Environment
    if (henv != SQL_NULL_HENV)
        SQLFreeHandle(SQL_HANDLE_ENV, henv);

    return 0;
}

