Update hello-samza with latest code RB: https://reviews.apache.org/r/68867/
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/b6acf190 Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/b6acf190 Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/b6acf190 Branch: refs/heads/master Commit: b6acf190ceeb57c30dadbc4be5d256c3bf9b3304 Parents: a5e5e56 Author: xinyuiscool <xinyuliu...@gmail.com> Authored: Thu Sep 27 17:26:35 2018 -0700 Committer: xinyuiscool <xinyuliu...@gmail.com> Committed: Mon Oct 1 16:14:26 2018 -0700 ---------------------------------------------------------------------- build.gradle | 4 +- .../config/pageview-adclick-joiner.properties | 10 +-- src/main/config/pageview-filter.properties | 10 +-- .../pageview-profile-table-joiner.properties | 8 -- src/main/config/pageview-sessionizer.properties | 10 +-- .../config/stock-price-table-joiner.properties | 8 -- .../config/tumbling-pageview-counter.properties | 10 +-- ...ikipedia-application-local-runner.properties | 21 ----- .../config/wikipedia-application.properties | 22 ----- .../samza/examples/azure/AzureApplication.java | 28 ++++-- .../examples/azure/AzureZKLocalApplication.java | 4 +- .../cookbook/PageViewAdClickJoiner.java | 43 ++++++++-- .../examples/cookbook/PageViewFilterApp.java | 39 +++++++-- .../cookbook/PageViewProfileTableJoiner.java | 54 +++++++++--- .../cookbook/PageViewSessionizerApp.java | 45 +++++++--- .../cookbook/StockPriceTableJoiner.java | 67 ++++++++++----- .../cookbook/TumblingPageViewCounterApp.java | 40 +++++++-- .../application/WikipediaApplication.java | 89 +++++++++++--------- .../WikipediaZkLocalApplication.java | 6 +- .../wikipedia/model/WikipediaParser.java | 1 + .../system/WikipediaInputDescriptor.java | 41 +++++++++ .../system/WikipediaSystemDescriptor.java | 51 +++++++++++ 22 files changed, 389 insertions(+), 222 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/b6acf190/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 9d1f543..6201bc5 100644 --- a/build.gradle +++ b/build.gradle @@ -48,6 +48,8 @@ dependencies { compile(group: 'org.schwering', name: 'irclib', version: '1.10') compile(group: 'org.apache.samza', name: 'samza-api', version: "$SAMZA_VERSION") compile(group: 'org.apache.samza', name: 'samza-kv_2.11', version: "$SAMZA_VERSION") + compile(group: 'org.apache.samza', name: 'samza-kafka_2.11', version: "$SAMZA_VERSION") + compile(group: 'org.apache.samza', name: 'samza-kv-rocksdb_2.11', version: "$SAMZA_VERSION") explode (group: 'org.apache.samza', name: 'samza-shell', ext: 'tgz', classifier: 'dist', version: "$SAMZA_VERSION") @@ -55,8 +57,6 @@ dependencies { runtime(group: 'org.apache.samza', name: 'samza-log4j', version: "$SAMZA_VERSION") runtime(group: 'org.apache.samza', name: 'samza-shell', version: "$SAMZA_VERSION") runtime(group: 'org.apache.samza', name: 'samza-yarn_2.11', version: "$SAMZA_VERSION") - runtime(group: 'org.apache.samza', name: 'samza-kv-rocksdb_2.11', version: "$SAMZA_VERSION") - runtime(group: 'org.apache.samza', name: 'samza-kafka_2.11', version: "$SAMZA_VERSION") runtime(group: 'org.apache.kafka', name: 'kafka_2.11', version: "$KAFKA_VERSION") runtime(group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: "$HADOOP_VERSION") } http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/b6acf190/src/main/config/pageview-adclick-joiner.properties ---------------------------------------------------------------------- diff --git a/src/main/config/pageview-adclick-joiner.properties b/src/main/config/pageview-adclick-joiner.properties index eba7b0b..8764974 100644 --- a/src/main/config/pageview-adclick-joiner.properties +++ b/src/main/config/pageview-adclick-joiner.properties @@ -19,18 +19,10 @@ job.factory.class=org.apache.samza.job.yarn.YarnJobFactory job.name=pageview-adclick-joiner job.container.count=2 -job.default.system=kafka -job.coordinator.system=kafka # YARN yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz # Task app.class=samza.examples.cookbook.PageViewAdClickJoiner -task.window.ms=2000 - -# Kafka System -systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory -systems.kafka.consumer.zookeeper.connect=localhost:2181 -systems.kafka.producer.bootstrap.servers=localhost:9092 -systems.kafka.default.stream.replication.factor=1 \ No newline at end of file +task.window.ms=2000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/b6acf190/src/main/config/pageview-filter.properties ---------------------------------------------------------------------- diff --git a/src/main/config/pageview-filter.properties b/src/main/config/pageview-filter.properties index 331ee1a..84228fa 100644 --- a/src/main/config/pageview-filter.properties +++ b/src/main/config/pageview-filter.properties @@ -19,18 +19,10 @@ job.factory.class=org.apache.samza.job.yarn.YarnJobFactory job.name=pageview-filter job.container.count=2 -job.default.system=kafka -job.coordinator.system=kafka # YARN yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz # Task app.class=samza.examples.cookbook.PageViewFilterApp -task.window.ms=2000 - -# Kafka System -systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory -systems.kafka.consumer.zookeeper.connect=localhost:2181 -systems.kafka.producer.bootstrap.servers=localhost:9092 -systems.kafka.default.stream.replication.factor=1 \ No newline at end of file +task.window.ms=2000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/b6acf190/src/main/config/pageview-profile-table-joiner.properties ---------------------------------------------------------------------- diff --git a/src/main/config/pageview-profile-table-joiner.properties b/src/main/config/pageview-profile-table-joiner.properties index 7cec601..d8c0fcf 100644 --- a/src/main/config/pageview-profile-table-joiner.properties +++ b/src/main/config/pageview-profile-table-joiner.properties @@ -19,17 +19,9 @@ job.factory.class=org.apache.samza.job.yarn.YarnJobFactory job.name=pageview-profile-table-joiner job.container.count=2 -job.default.system=kafka -job.coordinator.system=kafka # YARN yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz # Task app.class=samza.examples.cookbook.PageViewProfileTableJoiner - -# Kafka System -systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory -systems.kafka.consumer.zookeeper.connect=localhost:2181 -systems.kafka.producer.bootstrap.servers=localhost:9092 -systems.kafka.default.stream.replication.factor=1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/b6acf190/src/main/config/pageview-sessionizer.properties ---------------------------------------------------------------------- diff --git a/src/main/config/pageview-sessionizer.properties b/src/main/config/pageview-sessionizer.properties index 420cdde..74109ad 100644 --- a/src/main/config/pageview-sessionizer.properties +++ b/src/main/config/pageview-sessionizer.properties @@ -19,18 +19,10 @@ job.factory.class=org.apache.samza.job.yarn.YarnJobFactory job.name=pageview-sessionizer job.container.count=2 -job.default.system=kafka -job.coordinator.system=kafka # YARN yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz # Task app.class=samza.examples.cookbook.PageViewSessionizerApp -task.window.ms=2000 - -# Kafka System -systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory -systems.kafka.consumer.zookeeper.connect=localhost:2181 -systems.kafka.producer.bootstrap.servers=localhost:9092 -systems.kafka.default.stream.replication.factor=1 \ No newline at end of file +task.window.ms=2000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/b6acf190/src/main/config/stock-price-table-joiner.properties ---------------------------------------------------------------------- diff --git a/src/main/config/stock-price-table-joiner.properties b/src/main/config/stock-price-table-joiner.properties index f9bd684..410cdd4 100644 --- a/src/main/config/stock-price-table-joiner.properties +++ b/src/main/config/stock-price-table-joiner.properties @@ -19,17 +19,9 @@ job.factory.class=org.apache.samza.job.yarn.YarnJobFactory job.name=stock-price-table-joiner job.container.count=1 -job.default.system=kafka -job.coordinator.system=kafka # YARN yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz # Task app.class=samza.examples.cookbook.StockPriceTableJoiner - -# Kafka System -systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory -systems.kafka.consumer.zookeeper.connect=localhost:2181 -systems.kafka.producer.bootstrap.servers=localhost:9092 -systems.kafka.default.stream.replication.factor=1 http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/b6acf190/src/main/config/tumbling-pageview-counter.properties ---------------------------------------------------------------------- diff --git a/src/main/config/tumbling-pageview-counter.properties b/src/main/config/tumbling-pageview-counter.properties index b58dbe9..70ca290 100644 --- a/src/main/config/tumbling-pageview-counter.properties +++ b/src/main/config/tumbling-pageview-counter.properties @@ -19,18 +19,10 @@ job.factory.class=org.apache.samza.job.yarn.YarnJobFactory job.name=tumbling-pageview-counter job.container.count=2 -job.default.system=kafka -job.coordinator.system=kafka # YARN yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz # Task app.class=samza.examples.cookbook.TumblingPageViewCounterApp -task.window.ms=2000 - -# Kafka System -systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory -systems.kafka.consumer.zookeeper.connect=localhost:2181 -systems.kafka.producer.bootstrap.servers=localhost:9092 -systems.kafka.default.stream.replication.factor=1 \ No newline at end of file +task.window.ms=2000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/b6acf190/src/main/config/wikipedia-application-local-runner.properties ---------------------------------------------------------------------- diff --git a/src/main/config/wikipedia-application-local-runner.properties b/src/main/config/wikipedia-application-local-runner.properties index b770f13..c5be904 100644 --- a/src/main/config/wikipedia-application-local-runner.properties +++ b/src/main/config/wikipedia-application-local-runner.properties @@ -28,27 +28,6 @@ task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContain serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory -# Wikipedia System -systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory -systems.wikipedia.host=irc.wikimedia.org -systems.wikipedia.port=6667 - -# Kafka System -systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory -systems.kafka.consumer.zookeeper.connect=localhost:2181/ -systems.kafka.producer.bootstrap.servers=localhost:9092 -systems.kafka.default.stream.replication.factor=1 - -# Streams -streams.en-wikipedia.samza.system=wikipedia -streams.en-wikipedia.samza.physical.name=#en.wikipedia - -streams.en-wiktionary.samza.system=wikipedia -streams.en-wiktionary.samza.physical.name=#en.wiktionary - -streams.en-wikinews.samza.system=wikipedia -streams.en-wikinews.samza.physical.name=#en.wikinews - # Key-value storage stores.wikipedia-stats.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/b6acf190/src/main/config/wikipedia-application.properties ---------------------------------------------------------------------- diff --git a/src/main/config/wikipedia-application.properties b/src/main/config/wikipedia-application.properties index 841fcc5..0d9bf61 100644 --- a/src/main/config/wikipedia-application.properties +++ b/src/main/config/wikipedia-application.properties @@ -21,36 +21,14 @@ app.runner.class=org.apache.samza.runtime.RemoteApplicationRunner job.factory.class=org.apache.samza.job.yarn.YarnJobFactory job.name=wikipedia-application -job.default.system=kafka # YARN yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz -# Wikipedia System -systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory -systems.wikipedia.host=irc.wikimedia.org -systems.wikipedia.port=6667 - -# Kafka System -systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory -systems.kafka.consumer.zookeeper.connect=localhost:2181 -systems.kafka.producer.bootstrap.servers=localhost:9092 -systems.kafka.default.stream.replication.factor=1 - # Serializers serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory -# Streams which are not on default system or have special characters in the physical name. -streams.en-wikipedia.samza.system=wikipedia -streams.en-wikipedia.samza.physical.name=#en.wikipedia - -streams.en-wiktionary.samza.system=wikipedia -streams.en-wiktionary.samza.physical.name=#en.wiktionary - -streams.en-wikinews.samza.system=wikipedia -streams.en-wikinews.samza.physical.name=#en.wikinews - # Key-value storage stores.wikipedia-stats.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/b6acf190/src/main/java/samza/examples/azure/AzureApplication.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/azure/AzureApplication.java b/src/main/java/samza/examples/azure/AzureApplication.java index 9f565fe..12d293b 100644 --- a/src/main/java/samza/examples/azure/AzureApplication.java +++ b/src/main/java/samza/examples/azure/AzureApplication.java @@ -19,12 +19,15 @@ package samza.examples.azure; +import java.util.HashMap; import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.descriptors.GenericInputDescriptor; +import org.apache.samza.operators.descriptors.GenericOutputDescriptor; +import org.apache.samza.operators.descriptors.GenericSystemDescriptor; import org.apache.samza.serializers.ByteSerde; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; @@ -38,14 +41,25 @@ public class AzureApplication implements StreamApplication { private static final String OUTPUT_STREAM_ID = "output-stream"; @Override - public void init(StreamGraph graph, Config config) { + public void describe(StreamApplicationDescriptor appDescriptor) { + HashMap<String, String> systemConfigs = new HashMap<>(); - // Input - MessageStream<KV<String, byte[]>> eventhubInput = graph.getInputStream(INPUT_STREAM_ID); + GenericSystemDescriptor systemDescriptor = + new GenericSystemDescriptor("eventhubs", "org.apache.samza.system.eventhub.EventHubSystemFactory"); + + KVSerde<String, byte[]> serde = KVSerde.of(new StringSerde(), new ByteSerde()); + + GenericInputDescriptor<KV<String, byte[]>> inputDescriptor = + systemDescriptor.getInputDescriptor(INPUT_STREAM_ID, serde); + GenericOutputDescriptor<KV<String, byte[]>> outputDescriptor = + systemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, serde); + + + // Input + MessageStream<KV<String, byte[]>> eventhubInput = appDescriptor.getInputStream(inputDescriptor); // Output - OutputStream<KV<String, byte[]>> eventhubOutput = - graph.getOutputStream(OUTPUT_STREAM_ID, KVSerde.of(new StringSerde(), new ByteSerde())); + OutputStream<KV<String, byte[]>> eventhubOutput = appDescriptor.getOutputStream(outputDescriptor); // Send eventhubInput http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/b6acf190/src/main/java/samza/examples/azure/AzureZKLocalApplication.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/azure/AzureZKLocalApplication.java b/src/main/java/samza/examples/azure/AzureZKLocalApplication.java index 3d4f8b0..01075e2 100644 --- a/src/main/java/samza/examples/azure/AzureZKLocalApplication.java +++ b/src/main/java/samza/examples/azure/AzureZKLocalApplication.java @@ -32,10 +32,10 @@ public class AzureZKLocalApplication { OptionSet options = cmdLine.parser().parse(args); Config config = cmdLine.loadConfig(options); - LocalApplicationRunner runner = new LocalApplicationRunner(config); AzureApplication app = new AzureApplication(); + LocalApplicationRunner runner = new LocalApplicationRunner(app, config); + runner.run(); - runner.run(app); runner.waitForFinish(); } http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/b6acf190/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java index f6c3810..4c5d86b 100644 --- a/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java +++ b/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java @@ -19,19 +19,26 @@ package samza.examples.cookbook; import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import samza.examples.cookbook.data.AdClick; import samza.examples.cookbook.data.PageView; import java.time.Duration; +import java.util.List; +import java.util.Map; /** * In this example, we join a stream of Page views with a stream of Ad clicks. For instance, this is helpful for @@ -71,21 +78,39 @@ import java.time.Duration; * */ public class PageViewAdClickJoiner implements StreamApplication { + private static final String KAFKA_SYSTEM_NAME = "kafka"; + private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181"); + private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092"); + private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1"); - private static final String PAGEVIEW_TOPIC = "pageview-join-input"; - private static final String AD_CLICK_TOPIC = "adclick-join-input"; - private static final String OUTPUT_TOPIC = "pageview-adclick-join-output"; + private static final String PAGEVIEW_STREAM_ID = "pageview-join-input"; + private static final String ADCLICK_STREAM_ID = "adclick-join-input"; + private static final String OUTPUT_STREAM_ID = "pageview-adclick-join-output"; @Override - public void init(StreamGraph graph, Config config) { + public void describe(StreamApplicationDescriptor appDescriptor) { StringSerde stringSerde = new StringSerde(); JsonSerdeV2<PageView> pageViewSerde = new JsonSerdeV2<>(PageView.class); JsonSerdeV2<AdClick> adClickSerde = new JsonSerdeV2<>(AdClick.class); JsonSerdeV2<JoinResult> joinResultSerde = new JsonSerdeV2<>(JoinResult.class); - MessageStream<PageView> pageViews = graph.getInputStream(PAGEVIEW_TOPIC, pageViewSerde); - MessageStream<AdClick> adClicks = graph.getInputStream(AD_CLICK_TOPIC, adClickSerde); - OutputStream<JoinResult> joinResults = graph.getOutputStream(OUTPUT_TOPIC, joinResultSerde); + KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME) + .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT) + .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS) + .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS); + + KafkaInputDescriptor<PageView> pageViewInputDescriptor = + kafkaSystemDescriptor.getInputDescriptor(PAGEVIEW_STREAM_ID, pageViewSerde); + KafkaInputDescriptor<AdClick> adClickInputDescriptor = + kafkaSystemDescriptor.getInputDescriptor(ADCLICK_STREAM_ID, adClickSerde); + KafkaOutputDescriptor<JoinResult> joinResultOutputDescriptor = + kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, joinResultSerde); + + appDescriptor.withDefaultSystem(kafkaSystemDescriptor); + + MessageStream<PageView> pageViews = appDescriptor.getInputStream(pageViewInputDescriptor); + MessageStream<AdClick> adClicks = appDescriptor.getInputStream(adClickInputDescriptor); + OutputStream<JoinResult> joinResults = appDescriptor.getOutputStream(joinResultOutputDescriptor); JoinFunction<String, PageView, AdClick, JoinResult> pageViewAdClickJoinFunction = new JoinFunction<String, PageView, AdClick, JoinResult>() { http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/b6acf190/src/main/java/samza/examples/cookbook/PageViewFilterApp.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/PageViewFilterApp.java b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java index a2accfd..e131a8f 100644 --- a/src/main/java/samza/examples/cookbook/PageViewFilterApp.java +++ b/src/main/java/samza/examples/cookbook/PageViewFilterApp.java @@ -19,16 +19,24 @@ package samza.examples.cookbook; import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import samza.examples.cookbook.data.PageView; +import java.util.List; +import java.util.Map; + /** * In this example, we demonstrate re-partitioning a stream of page views and filtering out some bad events in the stream. * @@ -59,17 +67,32 @@ import samza.examples.cookbook.data.PageView; * </ol> */ public class PageViewFilterApp implements StreamApplication { + private static final String KAFKA_SYSTEM_NAME = "kafka"; + private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181"); + private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092"); + private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1"); - private static final String INPUT_TOPIC = "pageview-filter-input"; - private static final String OUTPUT_TOPIC = "pageview-filter-output"; + private static final String INPUT_STREAM_ID = "pageview-filter-input"; + private static final String OUTPUT_STREAM_ID = "pageview-filter-output"; private static final String INVALID_USER_ID = "invalidUserId"; @Override - public void init(StreamGraph graph, Config config) { - graph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class))); + public void describe(StreamApplicationDescriptor appDescriptor) { + KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME) + .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT) + .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS) + .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS); + + KVSerde<String, PageView> serde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class)); + KafkaInputDescriptor<KV<String, PageView>> inputDescriptor = + kafkaSystemDescriptor.getInputDescriptor(INPUT_STREAM_ID, serde); + KafkaOutputDescriptor<KV<String, PageView>> outputDescriptor = + kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, serde); + + appDescriptor.withDefaultSystem(kafkaSystemDescriptor); - MessageStream<KV<String, PageView>> pageViews = graph.getInputStream(INPUT_TOPIC); - OutputStream<KV<String, PageView>> filteredPageViews = graph.getOutputStream(OUTPUT_TOPIC); + MessageStream<KV<String, PageView>> pageViews = appDescriptor.getInputStream(inputDescriptor); + OutputStream<KV<String, PageView>> filteredPageViews = appDescriptor.getOutputStream(outputDescriptor); pageViews .partitionBy(kv -> kv.value.userId, kv -> kv.value, "pageview") http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/b6acf190/src/main/java/samza/examples/cookbook/PageViewProfileTableJoiner.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/PageViewProfileTableJoiner.java b/src/main/java/samza/examples/cookbook/PageViewProfileTableJoiner.java index 86deb61..f67e9c1 100644 --- a/src/main/java/samza/examples/cookbook/PageViewProfileTableJoiner.java +++ b/src/main/java/samza/examples/cookbook/PageViewProfileTableJoiner.java @@ -19,21 +19,29 @@ package samza.examples.cookbook; import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.operators.KV; +import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.functions.StreamTableJoinFunction; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.StringSerde; import org.apache.samza.storage.kv.RocksDbTableDescriptor; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.apache.samza.table.Table; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import samza.examples.cookbook.data.PageView; import samza.examples.cookbook.data.Profile; +import java.util.List; +import java.util.Map; + /** * In this example, we join a stream of Page views with a table of user profiles, which is populated from an * user profile stream. For instance, this is helpful for analysis that required additional information from @@ -73,29 +81,49 @@ import samza.examples.cookbook.data.Profile; * */ public class PageViewProfileTableJoiner implements StreamApplication { + private static final String KAFKA_SYSTEM_NAME = "kafka"; + private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181"); + private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092"); + private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1"); - private static final String PROFILE_TOPIC = "profile-table-input"; - private static final String PAGEVIEW_TOPIC = "pageview-join-input"; + private static final String PROFILE_STREAM_ID = "profile-table-input"; + private static final String PAGEVIEW_STREAM_ID = "pageview-join-input"; private static final String OUTPUT_TOPIC = "enriched-pageview-join-output"; @Override - public void init(StreamGraph graph, Config config) { - + public void describe(StreamApplicationDescriptor appDescriptor) { Serde<Profile> profileSerde = new JsonSerdeV2<>(Profile.class); Serde<PageView> pageViewSerde = new JsonSerdeV2<>(PageView.class); + Serde<EnrichedPageView> joinResultSerde = new JsonSerdeV2<>(EnrichedPageView.class); + + KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME) + .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT) + .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS) + .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS); + + KafkaInputDescriptor<Profile> profileInputDescriptor = + kafkaSystemDescriptor.getInputDescriptor(PROFILE_STREAM_ID, profileSerde); + KafkaInputDescriptor<PageView> pageViewInputDescriptor = + kafkaSystemDescriptor.getInputDescriptor(PAGEVIEW_STREAM_ID, pageViewSerde); + KafkaOutputDescriptor<EnrichedPageView> joinResultOutputDescriptor = + kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_TOPIC, joinResultSerde); + + RocksDbTableDescriptor<String, Profile> profileTableDescriptor = + new RocksDbTableDescriptor<String, Profile>("profile-table", KVSerde.of(new StringSerde(), profileSerde)); - OutputStream<EnrichedPageView> joinResultStream = graph.getOutputStream( - OUTPUT_TOPIC, new JsonSerdeV2<>(EnrichedPageView.class)); + appDescriptor.withDefaultSystem(kafkaSystemDescriptor); - Table profileTable = graph.getTable(new RocksDbTableDescriptor<String, Profile>("profile-table") - .withSerde(KVSerde.of(new StringSerde(), profileSerde))); + MessageStream<Profile> profileStream = appDescriptor.getInputStream(profileInputDescriptor); + MessageStream<PageView> pageViewStream = appDescriptor.getInputStream(pageViewInputDescriptor); + OutputStream<EnrichedPageView> joinResultStream = appDescriptor.getOutputStream(joinResultOutputDescriptor); + Table<KV<String, Profile>> profileTable = appDescriptor.getTable(profileTableDescriptor); - graph.getInputStream(PROFILE_TOPIC, profileSerde) + profileStream .map(profile -> KV.of(profile.userId, profile)) .sendTo(profileTable); - graph.getInputStream(PAGEVIEW_TOPIC, pageViewSerde) - .partitionBy(pv -> pv.userId, pv -> pv, new KVSerde(new StringSerde(), pageViewSerde), "join") + pageViewStream + .partitionBy(pv -> pv.userId, pv -> pv, KVSerde.of(new StringSerde(), pageViewSerde), "join") .join(profileTable, new JoinFn()) .sendTo(joinResultStream); } http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/b6acf190/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java index 2bcd9f5..fb17974 100644 --- a/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java +++ b/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java @@ -19,21 +19,27 @@ package samza.examples.cookbook; import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.windows.Windows; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import samza.examples.cookbook.data.PageView; import samza.examples.cookbook.data.UserPageViews; import java.time.Duration; -import java.util.function.Function; +import java.util.List; +import java.util.Map; /** * In this example, we group page views by userId into sessions, and compute the number of page views for each user @@ -70,21 +76,34 @@ import java.util.function.Function; * */ public class PageViewSessionizerApp implements StreamApplication { + private static final String KAFKA_SYSTEM_NAME = "kafka"; + private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181"); + private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092"); + private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1"); - private static final String INPUT_TOPIC = "pageview-session-input"; - private static final String OUTPUT_TOPIC = "pageview-session-output"; + private static final String INPUT_STREAM_ID = "pageview-session-input"; + private static final String OUTPUT_STREAM_ID = "pageview-session-output"; @Override - public void init(StreamGraph graph, Config config) { + public void describe(StreamApplicationDescriptor appDescriptor) { Serde<String> stringSerde = new StringSerde(); - Serde<PageView> pageviewSerde = new JsonSerdeV2<>(PageView.class); - KVSerde<String, PageView> pageViewKVSerde = KVSerde.of(stringSerde, pageviewSerde); - Serde<UserPageViews> userPageviewSerde = new JsonSerdeV2<>(UserPageViews.class); - graph.setDefaultSerde(pageViewKVSerde); + Serde<KV<String, PageView>> pageViewKVSerde = KVSerde.of(stringSerde, new JsonSerdeV2<>(PageView.class)); + Serde<KV<String, UserPageViews>> userPageViewSerde = KVSerde.of(stringSerde, new JsonSerdeV2<>(UserPageViews.class)); + + KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME) + .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT) + .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS) + .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS); + + KafkaInputDescriptor<KV<String, PageView>> pageViewInputDescriptor = + kafkaSystemDescriptor.getInputDescriptor(INPUT_STREAM_ID, pageViewKVSerde); + KafkaOutputDescriptor<KV<String, UserPageViews>> userPageViewsOutputDescriptor = + kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, userPageViewSerde); + + appDescriptor.withDefaultSystem(kafkaSystemDescriptor); - MessageStream<KV<String, PageView>> pageViews = graph.getInputStream(INPUT_TOPIC); - OutputStream<KV<String, UserPageViews>> userPageViews = - graph.getOutputStream(OUTPUT_TOPIC, KVSerde.of(stringSerde, userPageviewSerde)); + MessageStream<KV<String, PageView>> pageViews = appDescriptor.getInputStream(pageViewInputDescriptor); + OutputStream<KV<String, UserPageViews>> userPageViews = appDescriptor.getOutputStream(userPageViewsOutputDescriptor); pageViews .partitionBy(kv -> kv.value.userId, kv -> kv.value, "pageview") http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/b6acf190/src/main/java/samza/examples/cookbook/StockPriceTableJoiner.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/StockPriceTableJoiner.java b/src/main/java/samza/examples/cookbook/StockPriceTableJoiner.java index cb735d2..3aa951e 100644 --- a/src/main/java/samza/examples/cookbook/StockPriceTableJoiner.java +++ b/src/main/java/samza/examples/cookbook/StockPriceTableJoiner.java @@ -18,19 +18,27 @@ */ package samza.examples.cookbook; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + import java.io.Serializable; import java.net.URL; import java.time.Duration; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.operators.KV; +import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.functions.StreamTableJoinFunction; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.apache.samza.table.Table; import org.apache.samza.table.caching.CachingTableDescriptor; import org.apache.samza.table.remote.RemoteTableDescriptor; @@ -87,6 +95,10 @@ import org.codehaus.jackson.annotate.JsonProperty; * */ public class StockPriceTableJoiner implements StreamApplication { + private static final String KAFKA_SYSTEM_NAME = "kafka"; + private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181"); + private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092"); + private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1"); /** * Default API key "demo" only works for symbol "MSFT"; however you can get an @@ -97,27 +109,37 @@ public class StockPriceTableJoiner implements StreamApplication { private static final String URL_TEMPLATE = "https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol=%s&apikey=" + API_KEY; - private static final String INPUT_TOPIC = "stock-symbol-input"; - private static final String OUTPUT_TOPIC = "stock-price-output"; + private static final String INPUT_STREAM_ID = "stock-symbol-input"; + private static final String OUTPUT_STREAM_ID = "stock-price-output"; @Override - public void init(StreamGraph graph, Config config) { - - Table remoteTable = graph.getTable(new RemoteTableDescriptor("remote-table") - .withReadRateLimit(10) - .withReadFunction(new StockPriceReadFunction())); - - Table table = graph.getTable(new CachingTableDescriptor("table") - .withTable(remoteTable) - .withReadTtl(Duration.ofSeconds(5))); - - OutputStream<StockPrice> joinResultStream = graph.getOutputStream( - OUTPUT_TOPIC, new JsonSerdeV2<>(StockPrice.class)); - - graph.getInputStream(INPUT_TOPIC, new StringSerde()) + public void describe(StreamApplicationDescriptor appDescriptor) { + KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME) + .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT) + .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS) + .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS); + + KafkaInputDescriptor<String> stockSymbolInputDescriptor = + kafkaSystemDescriptor.getInputDescriptor(INPUT_STREAM_ID, new StringSerde()); + KafkaOutputDescriptor<StockPrice> stockPriceOutputDescriptor = + kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, new JsonSerdeV2<>(StockPrice.class)); + MessageStream<String> stockSymbolStream = appDescriptor.getInputStream(stockSymbolInputDescriptor); + OutputStream<StockPrice> stockPriceStream = appDescriptor.getOutputStream(stockPriceOutputDescriptor); + + RemoteTableDescriptor<String, Double> remoteTableDescriptor = + new RemoteTableDescriptor("remote-table") + .withReadRateLimit(10) + .withReadFunction(new StockPriceReadFunction()); + CachingTableDescriptor<String, Double> cachedRemoteTableDescriptor = + new CachingTableDescriptor<String, Double>("cached-remote-table") + .withTable(remoteTableDescriptor) + .withReadTtl(Duration.ofSeconds(5)); + Table<KV<String, Double>> cachedRemoteTable = appDescriptor.getTable(cachedRemoteTableDescriptor); + + stockSymbolStream .map(symbol -> new KV<String, Void>(symbol, null)) - .join(table, new JoinFn()) - .sendTo(joinResultStream); + .join(cachedRemoteTable, new JoinFn()) + .sendTo(stockPriceStream); } @@ -155,6 +177,11 @@ public class StockPriceTableJoiner implements StreamApplication { } }); } + + @Override + public boolean isRetriable(Throwable throwable) { + return false; + } } static class StockPrice implements Serializable { http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/b6acf190/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java index acf1411..69a8e7d 100644 --- a/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java +++ b/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java @@ -19,20 +19,27 @@ package samza.examples.cookbook; import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.windows.Windows; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import samza.examples.cookbook.data.PageView; import samza.examples.cookbook.data.UserPageViews; import java.time.Duration; +import java.util.List; +import java.util.Map; /** * In this example, we group a stream of page views by country, and compute the number of page views over a tumbling time @@ -71,17 +78,32 @@ import java.time.Duration; * */ public class TumblingPageViewCounterApp implements StreamApplication { + private static final String KAFKA_SYSTEM_NAME = "kafka"; + private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181"); + private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092"); + private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1"); - private static final String INPUT_TOPIC = "pageview-tumbling-input"; - private static final String OUTPUT_TOPIC = "pageview-tumbling-output"; + private static final String INPUT_STREAM_ID = "pageview-tumbling-input"; + private static final String OUTPUT_STREAM_ID = "pageview-tumbling-output"; @Override - public void init(StreamGraph graph, Config config) { - graph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class))); + public void describe(StreamApplicationDescriptor appDescriptor) { + KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME) + .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT) + .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS) + .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS); + + KVSerde<String, PageView> pageViewSerde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class)); + KVSerde<String, UserPageViews> userPageViewSerde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageViews.class)); + + KafkaInputDescriptor<KV<String, PageView>> pageViewInputDescriptor = + kafkaSystemDescriptor.getInputDescriptor(INPUT_STREAM_ID, pageViewSerde); + KafkaOutputDescriptor<KV<String, UserPageViews>> userPageViewOutputDescriptor = + kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, userPageViewSerde); - MessageStream<KV<String, PageView>> pageViews = graph.getInputStream(INPUT_TOPIC); - OutputStream<KV<String, UserPageViews>> outputStream = - graph.getOutputStream(OUTPUT_TOPIC, KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageViews.class))); + appDescriptor.withDefaultSystem(kafkaSystemDescriptor); + MessageStream<KV<String, PageView>> pageViews = appDescriptor.getInputStream(pageViewInputDescriptor); + OutputStream<KV<String, UserPageViews>> outputStream = appDescriptor.getOutputStream(userPageViewOutputDescriptor); pageViews .partitionBy(kv -> kv.value.userId, kv -> kv.value, "userId") http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/b6acf190/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java index 032608f..734df96 100644 --- a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java +++ b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java @@ -20,34 +20,40 @@ package samza.examples.wikipedia.application; import com.google.common.collect.ImmutableList; +import java.io.Serializable; import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.config.Config; import org.apache.samza.metrics.Counter; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.functions.FoldLeftFunction; +import org.apache.samza.operators.functions.SupplierFunction; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.Windows; import org.apache.samza.serializers.JsonSerdeV2; -import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.Serde; import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.apache.samza.task.TaskContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableMap; import samza.examples.wikipedia.model.WikipediaParser; import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent; +import samza.examples.wikipedia.system.WikipediaInputDescriptor; +import samza.examples.wikipedia.system.WikipediaSystemDescriptor; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.time.Duration; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -70,40 +76,42 @@ import java.util.Set; * <li>Send the window output to Kafka</li> * </ul> * - * All of this application logic is defined in the {@link #init(StreamGraph, Config)} method, which + * All of this application logic is defined in the {@link #describe(StreamApplicationDescriptor)} method, which * is invoked by the framework to load the application. */ -public class WikipediaApplication implements StreamApplication { - private static final Logger log = LoggerFactory.getLogger(WikipediaApplication.class); - - // Inputs - private static final String WIKIPEDIA_STREAM_ID = "en-wikipedia"; - private static final String WIKTIONARY_STREAM_ID = "en-wiktionary"; - private static final String WIKINEWS_STREAM_ID = "en-wikinews"; - - // Outputs - private static final String STATS_STREAM_ID = "wikipedia-stats"; +public class WikipediaApplication implements StreamApplication, Serializable { + private static final Logger LOG = LoggerFactory.getLogger(WikipediaApplication.class); - // Stores - private static final String STATS_STORE_NAME = "wikipedia-stats"; - - // Metrics - private static final String EDIT_COUNT_KEY = "count-edits-all-time"; + private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181"); + private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092"); + private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1"); @Override - public void init(StreamGraph graph, Config config) { - // Messages come from WikipediaConsumer so we know that they don't have a key and don't need to be deserialized. - graph.setDefaultSerde(new NoOpSerde<>()); - - // Inputs - // Messages come from WikipediaConsumer so we know the type is WikipediaFeedEvent - MessageStream<WikipediaFeedEvent> wikipediaEvents = graph.getInputStream(WIKIPEDIA_STREAM_ID); - MessageStream<WikipediaFeedEvent> wiktionaryEvents = graph.getInputStream(WIKTIONARY_STREAM_ID); - MessageStream<WikipediaFeedEvent> wikiNewsEvents = graph.getInputStream(WIKINEWS_STREAM_ID); - - // Output (also un-keyed) - OutputStream<WikipediaStatsOutput> wikipediaStats = - graph.getOutputStream(STATS_STREAM_ID, new JsonSerdeV2<>(WikipediaStatsOutput.class)); + public void describe(StreamApplicationDescriptor appDescriptor) { + WikipediaSystemDescriptor wikipediaSystemDescriptor = new WikipediaSystemDescriptor("irc.wikimedia.org", 6667); + WikipediaInputDescriptor wikipediaInputDescriptor = wikipediaSystemDescriptor + .getInputDescriptor("en-wikipedia") + .withChannel("#en.wikipedia"); + WikipediaInputDescriptor wiktionaryInputDescriptor = wikipediaSystemDescriptor + .getInputDescriptor("en-wiktionary") + .withChannel("#en.wiktionary"); + WikipediaInputDescriptor wikiNewsInputDescriptor = wikipediaSystemDescriptor + .getInputDescriptor("en-wikinews") + .withChannel("#en.wikinews"); + + KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor("kafka") + .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT) + .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS) + .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS); + + KafkaOutputDescriptor<WikipediaStatsOutput> statsOutputDescriptor = + kafkaSystemDescriptor.getOutputDescriptor("wikipedia-stats", new JsonSerdeV2<>(WikipediaStatsOutput.class)); + + appDescriptor.withDefaultSystem(kafkaSystemDescriptor); + MessageStream<WikipediaFeedEvent> wikipediaEvents = appDescriptor.getInputStream(wikipediaInputDescriptor); + MessageStream<WikipediaFeedEvent> wiktionaryEvents = appDescriptor.getInputStream(wiktionaryInputDescriptor); + MessageStream<WikipediaFeedEvent> wikiNewsEvents = appDescriptor.getInputStream(wikiNewsInputDescriptor); + OutputStream<WikipediaStatsOutput> wikipediaStats = appDescriptor.getOutputStream(statsOutputDescriptor); // Merge inputs MessageStream<WikipediaFeedEvent> allWikipediaEvents = @@ -112,8 +120,8 @@ public class WikipediaApplication implements StreamApplication { // Parse, update stats, prepare output, and send allWikipediaEvents .map(WikipediaParser::parseEvent) - .window(Windows.tumblingWindow(Duration.ofSeconds(10), WikipediaStats::new, - new WikipediaStatsAggregator(), WikipediaStats.serde()), "statsWindow") + .window(Windows.tumblingWindow(Duration.ofSeconds(10), + WikipediaStats::new, new WikipediaStatsAggregator(), WikipediaStats.serde()), "statsWindow") .map(this::formatOutput) .sendTo(wikipediaStats); } @@ -123,12 +131,13 @@ public class WikipediaApplication implements StreamApplication { * * Uses a KeyValueStore to persist a total edit count across restarts. */ - private class WikipediaStatsAggregator implements FoldLeftFunction<Map<String, Object>, WikipediaStats> { + private static class WikipediaStatsAggregator implements FoldLeftFunction<Map<String, Object>, WikipediaStats> { + private static final String EDIT_COUNT_KEY = "count-edits-all-time"; - private KeyValueStore<String, Integer> store; + private transient KeyValueStore<String, Integer> store; // Example metric. Running counter of the number of repeat edits of the same title within a single window. - private Counter repeatEdits; + private transient Counter repeatEdits; /** * {@inheritDoc} @@ -137,7 +146,7 @@ public class WikipediaApplication implements StreamApplication { */ @Override public void init(Config config, TaskContext context) { - store = (KeyValueStore<String, Integer>) context.getStore(STATS_STORE_NAME); + store = (KeyValueStore<String, Integer>) context.getStore("wikipedia-stats"); repeatEdits = context.getMetricsRegistry().newCounter("edit-counters", "repeat-edits"); } @@ -165,7 +174,7 @@ public class WikipediaApplication implements StreamApplication { if (!newTitle) { repeatEdits.inc(); - log.info("Frequent edits for title: {}", edit.get("title")); + LOG.info("Frequent edits for title: {}", edit.get("title")); } return stats; } http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/b6acf190/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java index 51dd28f..e3982b1 100644 --- a/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java +++ b/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java @@ -23,7 +23,6 @@ import joptsimple.OptionSet; import org.apache.samza.config.Config; import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.util.CommandLine; -import org.apache.samza.util.Util; /** @@ -45,10 +44,9 @@ public class WikipediaZkLocalApplication { OptionSet options = cmdLine.parser().parse(args); Config config = cmdLine.loadConfig(options); - LocalApplicationRunner runner = new LocalApplicationRunner(config); WikipediaApplication app = new WikipediaApplication(); - - runner.run(app); + LocalApplicationRunner runner = new LocalApplicationRunner(app, config); + runner.run(); runner.waitForFinish(); } } http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/b6acf190/src/main/java/samza/examples/wikipedia/model/WikipediaParser.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/model/WikipediaParser.java b/src/main/java/samza/examples/wikipedia/model/WikipediaParser.java index 9347962..398c54c 100644 --- a/src/main/java/samza/examples/wikipedia/model/WikipediaParser.java +++ b/src/main/java/samza/examples/wikipedia/model/WikipediaParser.java @@ -38,6 +38,7 @@ public class WikipediaParser { } catch (Exception e) { System.err.println("Unable to parse line: " + wikipediaFeedEvent); } + return parsedJsonObject; } http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/b6acf190/src/main/java/samza/examples/wikipedia/system/WikipediaInputDescriptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/system/WikipediaInputDescriptor.java b/src/main/java/samza/examples/wikipedia/system/WikipediaInputDescriptor.java new file mode 100644 index 0000000..92de60d --- /dev/null +++ b/src/main/java/samza/examples/wikipedia/system/WikipediaInputDescriptor.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package samza.examples.wikipedia.system; + +import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.serializers.Serde; +import samza.examples.wikipedia.application.WikipediaApplication; + + +public class WikipediaInputDescriptor extends InputDescriptor<WikipediaFeed.WikipediaFeedEvent, WikipediaInputDescriptor> { + // Messages come from WikipediaConsumer so we know that they don't have a key and don't need to be deserialized. + private static final Serde SERDE = new NoOpSerde(); + + WikipediaInputDescriptor(String streamId, SystemDescriptor systemDescriptor) { + super(streamId, SERDE, systemDescriptor, null); + } + + public WikipediaInputDescriptor withChannel(String channel) { + withPhysicalName(channel); + return this; + } +} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/b6acf190/src/main/java/samza/examples/wikipedia/system/WikipediaSystemDescriptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/system/WikipediaSystemDescriptor.java b/src/main/java/samza/examples/wikipedia/system/WikipediaSystemDescriptor.java new file mode 100644 index 0000000..6f50196 --- /dev/null +++ b/src/main/java/samza/examples/wikipedia/system/WikipediaSystemDescriptor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package samza.examples.wikipedia.system; + +import java.util.Map; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; + +public class WikipediaSystemDescriptor extends SystemDescriptor<WikipediaSystemDescriptor> { + private static final String SYSTEM_NAME = "wikipedia"; + private static final String FACTORY_CLASS_NAME = WikipediaSystemFactory.class.getName(); + private static final String HOST_KEY = "systems.%s.host"; + private static final String PORT_KEY = "systems.%s.port"; + + private final String host; + private final int port; + + public WikipediaSystemDescriptor(String host, int port) { + super(SYSTEM_NAME, FACTORY_CLASS_NAME, null, null); + this.host = host; + this.port = port; + } + + public WikipediaInputDescriptor getInputDescriptor(String streamId) { + return new WikipediaInputDescriptor(streamId, this); + } + + @Override + public Map<String, String> toConfig() { + Map<String, String> configs = super.toConfig(); + configs.put(String.format(HOST_KEY, getSystemName()), host); + configs.put(String.format(PORT_KEY, getSystemName()), Integer.toString(port)); + return configs; + } +}