package com.test;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;

public class TwoSinks2 {

	public static void main(String[] args) throws Exception {

		EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
		TableEnvironment tEnv = TableEnvironment.create(settings);

		String sql1 = "CREATE TABLE Person (\r\n" + "  person ROW(id STRING, name STRING\r\n" + "  ),\r\n"
				+ "  PRIMARY KEY (id) NOT ENFORCED\r\n" + ") WITH (\r\n" + "  'connector' = 'upsert-kafka',\r\n"
				+ "  'topic' = 'person',\r\n" + "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n"
				+ "  'key.format' = 'raw',\r\n" + "  'value.format' = 'avro-confluent',\r\n"
				+ "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" + ")";

		tEnv.executeSql(sql1);

		String sql2 = "CREATE TABLE Relationship (\r\n" + "  person ROW(id STRING, relationshipType STRING\r\n"
				+ "  ),\r\n" + "  PRIMARY KEY (id) NOT ENFORCED\r\n" + ") WITH (\r\n"
				+ "  'connector' = 'upsert-kafka',\r\n" + "  'topic' = 'relationship',\r\n"
				+ "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" + "  'key.format' = 'raw',\r\n"
				+ "  'value.format' = 'avro-confluent',\r\n"
				+ "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" + ")";

		tEnv.executeSql(sql2);

		tEnv.executeSql("CREATE TABLE Correlation (\n" + " correlate_id STRING ,\r\n" + "  name  STRING,\r\n"
				+ "  relationship STRING,\r\n" + "  PRIMARY KEY (correlate_id) NOT ENFORCED\r\n" + ") WITH (\n"
				+ "  'connector'  = 'jdbc',\n" + "  'url'        = 'sampleurl',\n"
				+ "  'table-name' = 'testschema.correlate',\n" + "   'driver'     = 'org.postgresql.Driver',\n"
				+ "  'username'   = 'xxxxxxx',\n" + "  'password'   ='yyyyyyyy'\n" + ")");

		String joinStat = "select p.id, p.name , r.relationshipType from Person p join Relationship r on p.id=r.id";

		tEnv.executeSql(joinStat);
		tEnv.executeSql("CREATE TABLE correlator (\n" + "  correlator_id STRING ,\r\n" + "  name  STRING,\r\n"
				+ "  relationship STRING,\r\n" + "  PRIMARY KEY (correlator_id) NOT ENFORCED\r\n" + ") WITH (\n"
				+ "  'connector' = 'upsert-kafka',\r\n" + "  'topic' = 'correlator',\r\n"
				+ "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" + "  'key.format' = 'raw',\r\n"
				+ "  'value.format' = 'avro-confluent',\r\n"
				+ "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" + ")");

		String relateQuery = "select correlator_id , name, relationship from Correlation; ";

		tEnv.executeSql(relateQuery);
	}

}
