public class CassandraClient extends TestBase{

    public static String cassandraKeyspace = "testkeyspace";
    public static String table = "testtable";
    public static String querystatments[];
    public static String dataMeta;
    public static String data[];
    public static String query;
    public static String line;
    public static Cluster cluster;
    public static Session session;
    public int noOfRows = 0;
    public ArrayList colName = new ArrayList();
      
    public CassandraClient(String dataFile, String queryFile, String node, String resFile, String clientMetaData) throws FileNotFoundException, IOException {
        super(dataFile, queryFile, node, resFile); 
        FileReader fr = new FileReader(clientMetaData);
        BufferedReader br = new BufferedReader(fr);

        LineNumberReader lnr = new LineNumberReader(new FileReader(new File(clientMetaData)));
        lnr.skip(Long.MAX_VALUE);
        int linesize = lnr.getLineNumber();
        linesize = linesize * 2;
        String[] parameters = new String[linesize];

        String parameter = br.readLine();
        parameter = parameter.replace(":", ";");
        parameters = parameter.split(";");
        // parameters[1] = cassandraKeyspace , parameters[3] = table name
        
        this.setCassandraKeyspace(parameters[1]);
        this.setTable(parameters[3]);
        connectToCluster();
    }
   
   void setCassandraKeyspace(String cassandraKeyspace){
       this.cassandraKeyspace = cassandraKeyspace;
   }
   void setTable(String table){
       this.table = table;
   }
    
    String getCassandraKeyspace(){
       return cassandraKeyspace;
   }
   String getTable(){
       return table;
   }
   
    
    private void connectToCluster(){
    /*
    Adds a contact point using the Cluster.Build auxiliary class
    Builds a cluster instance
    Retrieves metadata from the cluster
    Prints out: the name of the cluster and the datacenter, host name or IP address, and rack for each node in the cluster
    */
    System.out.println(super.getNode());
        cluster = Cluster.builder()
                .addContactPoints(super.getNode())
                .build();
        
        //Set connection timeout 3min (default is 5s)
        cluster.getConfiguration().getSocketOptions().setConnectTimeoutMillis(5*60*1000);
        
        //Set read (execute) timeout 3min (default is 12s)
        cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(30*60*1000);

                
        Metadata metadata = cluster.getMetadata(); 
        //session = cluster.connect(this.getCassandraKeyspace());
        System.out.printf("\nConnected to cluster: %s\n", metadata.getClusterName());
        
        for(Host host: metadata.getAllHosts()){
            System.out.printf("Datacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack());
            
        }
                
    }
    
    @Override
    protected void closeConnection(){
                
        cluster.shutdown();
        
    }
  
    @Override
    protected void insert() throws FileNotFoundException, IOException{
        
        
        
          
        try {
            
        
            FileReader fr = new FileReader(super.getDataFile());
            BufferedReader br = new BufferedReader(fr);
            dataMeta = br.readLine();
            dataMeta = dataMeta.replace(":", ";");
            String[] metaArray = dataMeta.split(";");
            // maxBoolColumns:10;maxIntColumns:10;maxIntValue:100;rows:2000;seed:0;mean:18.0;stddev:0.8;
         
          int colsBool = Integer.parseInt(metaArray[1]);
          int colsInt = Integer.parseInt(metaArray[3]);
          noOfRows = Integer.parseInt(metaArray[7]);
         
        // get number of lines and create a string object
        
         
        data = new String[noOfRows];
        
          try {
              
              //Tabelle erstellen
             
              
                String cqlCreateTable = "CREATE TABLE " + this.getCassandraKeyspace() + "." + this.getTable() + " (key int, d int";
                for (int i = 1; i <= colsBool; i++) {
                    cqlCreateTable = cqlCreateTable + ", v" + i + " boolean";
                }
                for (int i = colsBool + 1; i <= colsBool + colsInt; i++) {
                    cqlCreateTable = cqlCreateTable + ", v" + i + " int";
                }
                cqlCreateTable = cqlCreateTable + ", PRIMARY KEY (key)) WITH caching='all';";
                //Start creating table
                session = cluster.connect(this.getCassandraKeyspace());
                session.execute(cqlCreateTable);
                
                
                if (_debug) {
                    System.out.println("Create TABLE erfolgreich: " + cqlCreateTable);
                }
                //Index erzeugen
                for (int i = 1; i <= colsBool; i++) {
                    String sqlCreateIndex = "CREATE INDEX ON " + this.getCassandraKeyspace() + "." + this.getTable() + " ( v" + i + " );";
                    session.execute(sqlCreateIndex);
                }
                for (int i = colsBool + 1; i <= colsBool + colsInt; i++) {
                    String sqlCreateIndex = "CREATE INDEX ON " + this.getCassandraKeyspace() + "." + this.getTable() + " ( v" + i + " );";
                    session.execute(sqlCreateIndex);
                }
                if (_debug) {
                    System.out.println("Create Index erfolgreich");
                }
                String sqlCreateIndex = "CREATE INDEX ON " + this.getCassandraKeyspace() + "." + this.getTable() + " ( d );";
                session.execute(sqlCreateIndex);


            }catch (Exception e) {
                System.out.println("Create Table/Index failed" + e);
        }     
     
            int lineNr = 1;
            String cqlInsert = "INSERT INTO " + this.getCassandraKeyspace() + "." + this.getTable() + " ( key, d";
            for (int i = 1; i <= colsBool + colsInt; i++) {
                cqlInsert = cqlInsert + ", v" + i + " ";
            }
            cqlInsert = cqlInsert + ") VALUES ( ?, ? ";
            for (int i = 1; i <= colsBool + colsInt; i++) {
                cqlInsert = cqlInsert + ", ? ";
            }
            cqlInsert = cqlInsert + " );";

            if (_debug) {
                System.out.println("Prepared Stmt:" + cqlInsert);
            }
            PreparedStatement prepStatement = session.prepare(cqlInsert);
              
            while ((line = br.readLine()) != null) {
                String tmp = line;
                line = line.replace(":", ";");
                data = line.split(";");
                BoundStatement bStmt = prepStatement.bind(lineNr, 1);

                for (int i = 0; i < data.length; i = i + 2) {
                    
                    if (Integer.parseInt(data[i].substring(1)) <= colsBool) {
                        bStmt.setBool(data[i], (Integer.parseInt(data[i + 1]) == 1 ? true : false));
                    } else {
                        bStmt.setInt(data[i], Integer.parseInt(data[i + 1]));
                    }
                }
                if (_debug) {
                    System.out.println("query:" + bStmt.toString());
                }               
                session.execute(bStmt);
                
            
                lineNr++;
               
            }
            
            
        } catch (IOException ex) {
            System.out.println("Error: " + ex);
        }    
        
    }   
    
    @Override
    protected void clearDatabase(){
        
        try {
                    String cqlDropTable = "Drop TABLE " + this.getCassandraKeyspace() + "." + this.getTable() + "";
                    session = cluster.connect(this.getCassandraKeyspace());
                    session.execute(cqlDropTable);
                    if (_debug) {
                        System.out.println("Drop TABLE : " + cqlDropTable);
                    }
                } catch (Exception e) {
                    System.out.println("No TABLE with this name ");
                } 
        
    }
    
    

    @Override
    protected void queryPreProcessing() throws IOException{
         
       System.out.println("Starting pre-process...." + tempQueryFileHolder.length);    
         
         String[] metaData = queryMeta.split(";");
        // maxBoolColumns:1;maxIntColumns:0;maxIntValue:0;nrPointQuery:100;nrRangeQuery:0;seed:42;mean:18.0;stddev:0.8;
        int maxBoolColumns = Integer.parseInt(metaData[1]);
      
                      
         querystatments = new String[noOfQueries];
         
        
        for (int j=0; j<tempQueryFileHolder.length; j++) {
            query = tempQueryFileHolder[j];
        for (int i = 1; i <= maxBoolColumns; i++) {
            query = query.replace("v" + i + "=1", "v" + i + "=true");
            query = query.replace("v" + i + "=0", "v" + i + "=false");           
        }            
           line = "SELECT count(*) FROM " + this.getTable() + " WHERE d=1 AND " + query + " ALLOW FILTERING;";  
           
            querystatments[j]= line;
          
                 
        }
      System.out.println("Ending pre-process ...\t querystatments are \t" + querystatments.length);
    }
    
    @Override
     protected void runQuery(){
        
        try{
            session = cluster.connect(this.getCassandraKeyspace());
            results = new int[noOfQueries];
       for (int i=0; i<querystatments.length; i++) { 
           
           ResultSet rs = session.execute(querystatments[i]);
           Iterator<Row> iter = rs.iterator();
           Row row = iter.next();
           results[i]= Integer.parseInt(row.toString().substring(row.toString().indexOf("[")+1,row.toString().indexOf("]")));
           
       }             
       
       }catch (NoHostAvailableException e){ 
           System.out.println(e.getMessage()+"\n"+ e.getCause());
      }
  
     
     }

     @Override
      protected void queryPostProcessing(){
          
      }
       
   
      }
     
     
    