This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apex-malhar.git
commit 42bfa560fc9e11f049d6a18725e6f1a009e75483 Author: Thomas Weise <t...@apache.org> AuthorDate: Sun Feb 21 20:42:27 2016 -0800 Kafka to JDBC exactly-once example. --- examples/exactly-once/src/assemble/appPackage.xml | 43 +++++++ .../java/com/example/myapexapp/Application.java | 103 +++++++++++++++++ .../example/myapexapp/RandomNumberGenerator.java | 47 ++++++++ .../src/main/resources/META-INF/properties.xml | 24 ++++ .../exactly-once/src/site/conf/my-app-conf1.xml | 11 ++ .../com/example/myapexapp/ApplicationTest.java | 124 +++++++++++++++++++++ .../src/test/resources/log4j.properties | 23 ++++ 7 files changed, 375 insertions(+) diff --git a/examples/exactly-once/src/assemble/appPackage.xml b/examples/exactly-once/src/assemble/appPackage.xml new file mode 100644 index 0000000..7ad071c --- /dev/null +++ b/examples/exactly-once/src/assemble/appPackage.xml @@ -0,0 +1,43 @@ +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>appPackage</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${basedir}/target/</directory> + <outputDirectory>/app</outputDirectory> + <includes> + <include>${project.artifactId}-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/target/deps</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/site/conf</directory> + <outputDirectory>/conf</outputDirectory> + <includes> + <include>*.xml</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/META-INF</directory> + <outputDirectory>/META-INF</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/app</directory> + <outputDirectory>/app</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/resources</directory> + <outputDirectory>/resources</outputDirectory> + </fileSet> + </fileSets> + +</assembly> + diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java b/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java new file mode 100644 index 0000000..8050d67 --- /dev/null +++ b/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java @@ -0,0 +1,103 @@ +/** + * Put your copyright and license info here. + */ +package com.example.myapexapp; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Map; + +import org.apache.commons.lang.mutable.MutableInt; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator; +import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.io.IdempotentStorageManager; +import com.datatorrent.lib.util.BaseUniqueKeyCounter; +import com.datatorrent.lib.util.KeyValPair; + +@ApplicationAnnotation(name="ExactlyOnceExampleApplication") +public class Application implements StreamingApplication +{ + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("kafkaInput", new KafkaSinglePortStringInputOperator()); + kafkaInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager()); + UniqueCounter<String> count = dag.addOperator("count", new UniqueCounter<String>()); + CountStoreOperator store = dag.addOperator("store", new CountStoreOperator()); + store.setStore(new JdbcTransactionalStore()); + ConsoleOutputOperator cons = dag.addOperator("console", new ConsoleOutputOperator()); + dag.addStream("words", kafkaInput.outputPort, count.data); + dag.addStream("counts", count.count, store.input, cons.input); + } + + public static class CountStoreOperator extends AbstractJdbcTransactionableOutputOperator<KeyValPair<String, Integer>> + { + public static final String SQL = + "MERGE INTO words USING (VALUES ?, ?) I (word, wcount)" + + " ON (words.word=I.word)" + + " WHEN MATCHED THEN UPDATE SET words.wcount = words.wcount + I.wcount" + + " WHEN NOT MATCHED THEN INSERT (word, wcount) VALUES (I.word, I.wcount)"; + + @Override + protected String getUpdateCommand() + { + return SQL; + } + + @Override + protected void setStatementParameters(PreparedStatement statement, KeyValPair<String, Integer> tuple) throws SQLException + { + statement.setString(1, tuple.getKey()); + statement.setInt(2, tuple.getValue()); + } + } + + public static class UniqueCounter<K> extends BaseUniqueKeyCounter<K> + { + /** + * The input port which receives incoming tuples. + */ + public final transient DefaultInputPort<K> data = new DefaultInputPort<K>() + { + /** + * Reference counts tuples + */ + @Override + public void process(K tuple) + { + processTuple(tuple); + } + + }; + + public final transient DefaultOutputPort<KeyValPair<K, Integer>> count = new DefaultOutputPort<KeyValPair<K, Integer>>() + { + @Override + public Unifier<KeyValPair<K, Integer>> getUnifier() + { + throw new UnsupportedOperationException("not partitionable"); + } + }; + + @Override + public void endWindow() + { + for (Map.Entry<K, MutableInt> e: map.entrySet()) { + count.emit(new KeyValPair<>(e.getKey(), e.getValue().toInteger())); + } + map.clear(); + } + + } + +} diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/RandomNumberGenerator.java b/examples/exactly-once/src/main/java/com/example/myapexapp/RandomNumberGenerator.java new file mode 100644 index 0000000..eed344b --- /dev/null +++ b/examples/exactly-once/src/main/java/com/example/myapexapp/RandomNumberGenerator.java @@ -0,0 +1,47 @@ +/** + * Put your copyright and license info here. + */ +package com.example.myapexapp; + +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.common.util.BaseOperator; + +/** + * This is a simple operator that emits random number. + */ +public class RandomNumberGenerator extends BaseOperator implements InputOperator +{ + private int numTuples = 100; + private transient int count = 0; + + public final transient DefaultOutputPort<Double> out = new DefaultOutputPort<Double>(); + + @Override + public void beginWindow(long windowId) + { + count = 0; + } + + @Override + public void emitTuples() + { + if (count++ < numTuples) { + out.emit(Math.random()); + } + } + + public int getNumTuples() + { + return numTuples; + } + + /** + * Sets the number of tuples to be emitted every window. + * @param numTuples number of tuples + */ + public void setNumTuples(int numTuples) + { + this.numTuples = numTuples; + } +} diff --git a/examples/exactly-once/src/main/resources/META-INF/properties.xml b/examples/exactly-once/src/main/resources/META-INF/properties.xml new file mode 100644 index 0000000..876c39a --- /dev/null +++ b/examples/exactly-once/src/main/resources/META-INF/properties.xml @@ -0,0 +1,24 @@ +<?xml version="1.0"?> +<configuration> + <!-- + <property> + <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name> + <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value> + </property> + --> + <!-- memory assigned to app master + <property> + <name>dt.attr.MASTER_MEMORY_MB</name> + <value>1024</value> + </property> + --> + <property> + <name>dt.application.MyFirstApplication.operator.randomGenerator.prop.numTuples</name> + <value>1000</value> + </property> + <property> + <name>dt.application.MyFirstApplication.operator.console.prop.stringFormat</name> + <value>hello world: %s</value> + </property> +</configuration> + diff --git a/examples/exactly-once/src/site/conf/my-app-conf1.xml b/examples/exactly-once/src/site/conf/my-app-conf1.xml new file mode 100644 index 0000000..ccb2b66 --- /dev/null +++ b/examples/exactly-once/src/site/conf/my-app-conf1.xml @@ -0,0 +1,11 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<configuration> + <property> + <name>dt.attr.MASTER_MEMORY_MB</name> + <value>1024</value> + </property> + <property> + <name>dt.application.MyFirstApplication.operator.randomGenerator.prop.numTuples</name> + <value>1000</value> + </property> +</configuration> diff --git a/examples/exactly-once/src/test/java/com/example/myapexapp/ApplicationTest.java b/examples/exactly-once/src/test/java/com/example/myapexapp/ApplicationTest.java new file mode 100644 index 0000000..c5aa69c --- /dev/null +++ b/examples/exactly-once/src/test/java/com/example/myapexapp/ApplicationTest.java @@ -0,0 +1,124 @@ +/** + * Put your copyright and license info here. + */ +package com.example.myapexapp; + +import java.io.File; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.HashSet; + +import javax.validation.ConstraintViolationException; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.LocalMode; +import com.datatorrent.contrib.kafka.KafkaOperatorTestBase; +import com.datatorrent.contrib.kafka.KafkaTestProducer; +import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; +import com.example.myapexapp.Application; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +/** + * Test the application in local mode. + */ +public class ApplicationTest +{ + private final KafkaOperatorTestBase kafkaLauncher = new KafkaOperatorTestBase(); + private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class); + private static final String KAFKA_TOPIC = "exactly-once-test"; + private static final String DB_DRIVER = "org.hsqldb.jdbcDriver"; + private static final String DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true"; + private static final String TABLE_NAME = "WORDS"; + + @Before + public void beforeTest() throws Exception { + kafkaLauncher.baseDir = "target/" + this.getClass().getName(); + FileUtils.deleteDirectory(new File(kafkaLauncher.baseDir)); + kafkaLauncher.startZookeeper(); + kafkaLauncher.startKafkaServer(); + kafkaLauncher.createTopic(0, KAFKA_TOPIC); + + // setup hsqldb + Class.forName(DB_DRIVER).newInstance(); + + Connection con = DriverManager.getConnection(DB_URL); + Statement stmt = con.createStatement(); + + String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( " + + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, " + + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, " + + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, " + + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") " + + ")"; + stmt.executeUpdate(createMetaTable); + + String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + + "(word VARCHAR(255) not NULL, wcount INTEGER, PRIMARY KEY ( word ))"; + stmt.executeUpdate(createTable); + + } + + @After + public void afterTest() { + kafkaLauncher.stopKafkaServer(); + kafkaLauncher.stopZookeeper(); + } + + @Test + public void testApplication() throws Exception { + try { + // produce some test data + KafkaTestProducer p = new KafkaTestProducer(KAFKA_TOPIC); + String[] words = "count the words from kafka and store them in the db".split("\\s+"); + p.setMessages(Lists.newArrayList(words)); + new Thread(p).start(); + + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + conf.set("dt.operator.kafkaInput.prop.topic", KAFKA_TOPIC); + conf.set("dt.operator.kafkaInput.prop.zookeeper", "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]); + conf.set("dt.operator.kafkaInput.prop.maxTuplesPerWindow", "1"); // consume one word per window + conf.set("dt.operator.store.prop.store.databaseDriver", DB_DRIVER); + conf.set("dt.operator.store.prop.store.databaseUrl", DB_URL); + + lma.prepareDAG(new Application(), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); // test will terminate after results are available + + HashSet<String> wordsSet = Sets.newHashSet(words); + Connection con = DriverManager.getConnection(DB_URL); + Statement stmt = con.createStatement(); + int rowCount = 0; + long timeout = System.currentTimeMillis() + 30000; // 30s timeout + while (rowCount < wordsSet.size() && timeout > System.currentTimeMillis()) { + Thread.sleep(1000); + String countQuery = "SELECT count(*) from " + TABLE_NAME; + ResultSet resultSet = stmt.executeQuery(countQuery); + resultSet.next(); + rowCount = resultSet.getInt(1); + resultSet.close(); + LOG.info("current row count in {} is {}", TABLE_NAME, rowCount); + } + Assert.assertEquals("number of words", wordsSet.size(), rowCount); + + lc.shutdown(); + + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + +} diff --git a/examples/exactly-once/src/test/resources/log4j.properties b/examples/exactly-once/src/test/resources/log4j.properties new file mode 100644 index 0000000..dd5910b --- /dev/null +++ b/examples/exactly-once/src/test/resources/log4j.properties @@ -0,0 +1,23 @@ +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +log4j.logger.kafka.server=info +log4j.logger.kafka.request.logger=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=info -- To stop receiving notification emails like this one, please contact "commits@apex.apache.org" <commits@apex.apache.org>.