apex-malhar git commit: APEXMALHAR-2205 #resolve #comment State management benchmark
Repository: apex-malhar Updated Branches: refs/heads/master f006ac6f5 -> c5a12e4e7 APEXMALHAR-2205 #resolve #comment State management benchmark Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c5a12e4e Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c5a12e4e Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c5a12e4e Branch: refs/heads/master Commit: c5a12e4e747c5be840a16e4c932cbc1dbff79894 Parents: f006ac6 Author: brightchenAuthored: Fri Aug 26 16:09:12 2016 -0700 Committer: brightchen Committed: Thu Sep 1 15:00:29 2016 -0700 -- benchmark/pom.xml | 5 + .../state/ManagedStateBenchmarkApp.java | 215 +++ .../benchmark/state/StoreOperator.java | 127 +++ .../state/ManagedStateBenchmarkAppTester.java | 70 ++ 4 files changed, 417 insertions(+) -- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c5a12e4e/benchmark/pom.xml -- diff --git a/benchmark/pom.xml b/benchmark/pom.xml index f09ae81..d5451b9 100644 --- a/benchmark/pom.xml +++ b/benchmark/pom.xml @@ -595,6 +595,11 @@ + + joda-time + joda-time + 2.9.4 + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c5a12e4e/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java -- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java new file mode 100644 index 000..25e3971 --- /dev/null +++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.benchmark.state; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.Random; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.managed.ManagedStateImpl; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Stats.OperatorStats; +import com.datatorrent.api.StatsListener; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.lib.util.KeyValPair; + +@ApplicationAnnotation(name = "ManagedStateBenchmark") +public class ManagedStateBenchmarkApp implements StreamingApplication +{ + private static final Logger logger = LoggerFactory.getLogger(ManagedStateBenchmarkApp.class); + + protected static final String PROP_STORE_PATH = "dt.application.ManagedStateBenchmark.storeBasePath"; + protected static final String DEFAULT_BASE_PATH = "ManagedStateBenchmark/Store"; + + @Override + public void populateDAG(DAG dag, Configuration conf) + { +TestStatsListener sl = new TestStatsListener(); +sl.adjustRate = conf.getBoolean("dt.ManagedStateBenchmark.adjustRate", false); +TestGenerator gen = dag.addOperator("Generator", new TestGenerator()); +dag.setAttribute(gen, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl)); + +StoreOperator storeOperator = new StoreOperator(); +storeOperator.setStore(createStore(conf)); +StoreOperator store = dag.addOperator("Store", storeOperator); + +dag.setAttribute(store, OperatorContext.STATS_LISTENERS,
[1/2] apex-core git commit: APEXCORE-515 Providing principal for token refresh
Repository: apex-core Updated Branches: refs/heads/master ae0ec2464 -> c13b0dd41 APEXCORE-515 Providing principal for token refresh Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/dd5e95a0 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/dd5e95a0 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/dd5e95a0 Branch: refs/heads/master Commit: dd5e95a090214bdcff3232cbdcad3dc59c24eff9 Parents: d651edc Author: Pramod ImmaneniAuthored: Wed Aug 24 17:25:25 2016 -0700 Committer: Pramod Immaneni Committed: Thu Sep 1 12:26:43 2016 -0700 -- .../stram/StreamingAppMasterService.java| 3 +- .../stram/client/StramAppLauncher.java | 28 +- .../stram/engine/StreamingContainer.java| 3 +- .../stram/plan/logical/LogicalPlan.java | 1 + .../stram/security/StramUserLogin.java | 20 + .../stram/client/StramAppLauncherTest.java | 30 ++-- 6 files changed, 54 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 43ab743..15b6402 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -676,6 +676,7 @@ public class StreamingAppMasterService extends CompositeService long tokenLifeTime = (long)(dag.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * Math.min(dag.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME), dag.getValue(LogicalPlan.RM_TOKEN_LIFE_TIME))); long expiryTime = System.currentTimeMillis() + tokenLifeTime; LOG.debug(" expiry token time {}", tokenLifeTime); +String principal = dag.getValue(LogicalPlan.PRINCIPAL); String hdfsKeyTabFile = dag.getValue(LogicalPlan.KEY_TAB_FILE); // Register self with ResourceManager @@ -753,7 +754,7 @@ public class StreamingAppMasterService extends CompositeService if (UserGroupInformation.isSecurityEnabled() && currentTimeMillis >= expiryTime && hdfsKeyTabFile != null) { String applicationId = appAttemptID.getApplicationId().toString(); -expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, hdfsKeyTabFile, credentials, rmAddress, true); +expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, principal, hdfsKeyTabFile, credentials, rmAddress, true); } if (currentTimeMillis > nodeReportUpdateTime) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java index 5024c38..619252f 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java +++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java @@ -560,14 +560,12 @@ public class StramAppLauncher return cl; } - private void setTokenRefreshKeytab(LogicalPlan dag, Configuration conf) throws IOException + private void setTokenRefreshCredentials(LogicalPlan dag, Configuration conf) throws IOException { -String keytabPath; -if ((keytabPath = conf.get(StramClientUtils.KEY_TAB_FILE)) == null) { - String keytab; - if ((keytab = StramUserLogin.getKeytab()) == null) { -keytab = conf.get(StramUserLogin.DT_AUTH_KEYTAB); - } +String principal = StramUserLogin.getPrincipal(); +String keytabPath = conf.get(StramClientUtils.KEY_TAB_FILE); +if (keytabPath == null) { + String keytab = StramUserLogin.getKeytab(); if (keytab != null) { Path localKeyTabPath = new Path(keytab); try (FileSystem fs = StramClientUtils.newFileSystemInstance(conf)) { @@ -579,10 +577,11 @@ public class StramAppLauncher } } } -if (keytabPath != null) { +if ((principal != null) && (keytabPath != null)) { + dag.setAttribute(LogicalPlan.PRINCIPAL, principal); dag.setAttribute(LogicalPlan.KEY_TAB_FILE, keytabPath); } else { - LOG.warn("No keytab specified for refreshing tokens, application may not be able to run indefinitely"); + LOG.warn("Credentials
apex-malhar git commit: APEXMALHAR-2201 Suppressed console output in tests of Stream API.
Repository: apex-malhar Updated Branches: refs/heads/master 8f00cefa2 -> f006ac6f5 APEXMALHAR-2201 Suppressed console output in tests of Stream API. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/f006ac6f Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/f006ac6f Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/f006ac6f Branch: refs/heads/master Commit: f006ac6f557340fe620839debec2f076e1e291af Parents: 8f00cef Author: ShunxinAuthored: Wed Aug 31 11:18:56 2016 -0700 Committer: Shunxin Committed: Thu Sep 1 13:05:52 2016 -0700 -- .../malhar/stream/sample/MinimalWordCount.java | 2 +- .../malhar/stream/sample/WindowedWordCount.java | 53 .../stream/sample/complete/AutoComplete.java| 53 .../sample/complete/TopWikipediaSessions.java | 1 + .../stream/sample/complete/TrafficRoutes.java | 2 +- .../sample/cookbook/CombinePerKeyExamples.java | 30 +++ .../stream/sample/cookbook/DeDupExample.java| 5 +- .../stream/sample/MinimalWordCountTest.java | 2 +- .../stream/sample/WindowedWordCountTest.java| 3 +- .../sample/complete/AutoCompleteTest.java | 3 +- .../complete/TopWikipediaSessionsTest.java | 1 + .../sample/complete/TrafficRoutesTest.java | 1 + .../cookbook/CombinePerKeyExamplesTest.java | 6 +-- .../sample/cookbook/DeDupExampleTest.java | 2 +- .../apex/malhar/stream/api/ApexStream.java | 6 +++ .../malhar/stream/api/impl/ApexStreamImpl.java | 9 .../stream/sample/ApplicationWithStreamAPI.java | 6 ++- .../sample/ApplicationWithStreamAPITest.java| 2 + .../apex/malhar/stream/sample/MyStreamTest.java | 2 +- 19 files changed, 124 insertions(+), 65 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java -- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java index 21afc5b..03579ab 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java @@ -117,7 +117,7 @@ public class MinimalWordCount implements StreamingApplication } }, name("FormatResults")) // Print the result. -.print() +.print(name("console")) // Attach a collector to the stream to collect results. .endWith(collector, collector.input, name("Collector")) // populate the dag using the stream. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java -- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java index c8a0e51..f020ddf 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java @@ -64,17 +64,11 @@ public class WindowedWordCount implements StreamingApplication */ public static class TextInput extends BaseOperator implements InputOperator { -private static boolean done = false; - public final transient DefaultOutputPort output = new DefaultOutputPort<>(); +private boolean done = false; private transient BufferedReader reader; -public static boolean isDone() -{ - return done; -} - @Override public void setup(Context.OperatorContext context) { @@ -101,20 +95,21 @@ public class WindowedWordCount implements StreamingApplication @Override public void emitTuples() { - try { -String line = reader.readLine(); -if (line == null) { - done = true; - reader.close(); - Thread.sleep(1000); -} else { - this.output.emit(line); + if (!done) { +try { + String line = reader.readLine(); + if (line == null) { +done = true; +reader.close(); + } else { +this.output.emit(line); + } + Thread.sleep(50); +} catch (IOException ex) { + throw new RuntimeException(ex); +} catch
[apex-core] Git Push Summary
Repository: apex-core Updated Branches: refs/heads/APEXCORE-515 [deleted] c13b0dd41
[1/2] apex-core git commit: APEXCORE-515 Providing principal for token refresh
Repository: apex-core Updated Branches: refs/heads/APEXCORE-515 [created] c13b0dd41 APEXCORE-515 Providing principal for token refresh Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/dd5e95a0 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/dd5e95a0 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/dd5e95a0 Branch: refs/heads/APEXCORE-515 Commit: dd5e95a090214bdcff3232cbdcad3dc59c24eff9 Parents: d651edc Author: Pramod ImmaneniAuthored: Wed Aug 24 17:25:25 2016 -0700 Committer: Pramod Immaneni Committed: Thu Sep 1 12:26:43 2016 -0700 -- .../stram/StreamingAppMasterService.java| 3 +- .../stram/client/StramAppLauncher.java | 28 +- .../stram/engine/StreamingContainer.java| 3 +- .../stram/plan/logical/LogicalPlan.java | 1 + .../stram/security/StramUserLogin.java | 20 + .../stram/client/StramAppLauncherTest.java | 30 ++-- 6 files changed, 54 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 43ab743..15b6402 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -676,6 +676,7 @@ public class StreamingAppMasterService extends CompositeService long tokenLifeTime = (long)(dag.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * Math.min(dag.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME), dag.getValue(LogicalPlan.RM_TOKEN_LIFE_TIME))); long expiryTime = System.currentTimeMillis() + tokenLifeTime; LOG.debug(" expiry token time {}", tokenLifeTime); +String principal = dag.getValue(LogicalPlan.PRINCIPAL); String hdfsKeyTabFile = dag.getValue(LogicalPlan.KEY_TAB_FILE); // Register self with ResourceManager @@ -753,7 +754,7 @@ public class StreamingAppMasterService extends CompositeService if (UserGroupInformation.isSecurityEnabled() && currentTimeMillis >= expiryTime && hdfsKeyTabFile != null) { String applicationId = appAttemptID.getApplicationId().toString(); -expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, hdfsKeyTabFile, credentials, rmAddress, true); +expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, principal, hdfsKeyTabFile, credentials, rmAddress, true); } if (currentTimeMillis > nodeReportUpdateTime) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java index 5024c38..619252f 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java +++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java @@ -560,14 +560,12 @@ public class StramAppLauncher return cl; } - private void setTokenRefreshKeytab(LogicalPlan dag, Configuration conf) throws IOException + private void setTokenRefreshCredentials(LogicalPlan dag, Configuration conf) throws IOException { -String keytabPath; -if ((keytabPath = conf.get(StramClientUtils.KEY_TAB_FILE)) == null) { - String keytab; - if ((keytab = StramUserLogin.getKeytab()) == null) { -keytab = conf.get(StramUserLogin.DT_AUTH_KEYTAB); - } +String principal = StramUserLogin.getPrincipal(); +String keytabPath = conf.get(StramClientUtils.KEY_TAB_FILE); +if (keytabPath == null) { + String keytab = StramUserLogin.getKeytab(); if (keytab != null) { Path localKeyTabPath = new Path(keytab); try (FileSystem fs = StramClientUtils.newFileSystemInstance(conf)) { @@ -579,10 +577,11 @@ public class StramAppLauncher } } } -if (keytabPath != null) { +if ((principal != null) && (keytabPath != null)) { + dag.setAttribute(LogicalPlan.PRINCIPAL, principal); dag.setAttribute(LogicalPlan.KEY_TAB_FILE, keytabPath); } else { - LOG.warn("No keytab specified for refreshing tokens, application may not be able to run indefinitely"); +
[2/2] apex-core git commit: Merge branch 'APEXCORE-515' of https://github.com/PramodSSImmaneni/apex-core into APEXCORE-515
Merge branch 'APEXCORE-515' of https://github.com/PramodSSImmaneni/apex-core into APEXCORE-515 Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/c13b0dd4 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/c13b0dd4 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/c13b0dd4 Branch: refs/heads/APEXCORE-515 Commit: c13b0dd417e805601169f090fdeebf5b42c78651 Parents: ae0ec24 dd5e95a Author: Vlad RozovAuthored: Thu Sep 1 12:54:37 2016 -0700 Committer: Vlad Rozov Committed: Thu Sep 1 12:54:37 2016 -0700 -- .../stram/StreamingAppMasterService.java| 3 +- .../stram/client/StramAppLauncher.java | 28 +- .../stram/engine/StreamingContainer.java| 3 +- .../stram/plan/logical/LogicalPlan.java | 1 + .../stram/security/StramUserLogin.java | 20 + .../stram/client/StramAppLauncherTest.java | 30 ++-- 6 files changed, 54 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/c13b0dd4/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java --
[08/12] apex-malhar git commit: Updated algo & working on math operators
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexTest.java -- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexTest.java new file mode 100644 index 000..337cefb --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexTest.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.Sink; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * @deprecated + * Functional tests for {@link InvertIndex} + * + */ +@Deprecated +public class InvertIndexTest +{ + private static Logger log = LoggerFactory.getLogger(InvertIndexTest.class); + + /** + * Test oper logic emits correct results + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testNodeProcessing() throws Exception + { +InvertIndexoper = new InvertIndex (); +CollectorTestSink indexSink = new CollectorTestSink(); + +Sink inSink = oper.data.getSink(); +oper.index.setSink(indexSink); + +oper.beginWindow(0); + +HashMap input = new HashMap (); + +input.put("a", "str"); +input.put("b", "str"); +inSink.put(input); + +input.clear(); +input.put("a", "str1"); +input.put("b", "str1"); +inSink.put(input); + +input.clear(); +input.put("c", "blah"); +inSink.put(input); + +input.clear(); +input.put("c", "str1"); +inSink.put(input); +oper.endWindow(); + +Assert.assertEquals("number emitted tuples", 3, indexSink.collectedTuples.size()); +for (Object o: indexSink.collectedTuples) { + log.debug(o.toString()); + HashMap output = (HashMap )o; + for (Map.Entry e: output.entrySet()) { +String key = e.getKey(); +ArrayList alist = e.getValue(); +if (key.equals("str1")) { + Assert.assertEquals("Index for \"str1\" contains \"a\"", true, alist.contains("a")); + Assert.assertEquals("Index for \"str1\" contains \"b\"", true, alist.contains("b")); + Assert.assertEquals("Index for \"str1\" contains \"c\"", true, alist.contains("c")); + +} else if (key.equals("str")) { + Assert.assertEquals("Index for \"str1\" contains \"a\"", true, alist.contains("a")); + Assert.assertEquals("Index for \"str1\" contains \"b\"", true, alist.contains("b")); + Assert.assertEquals("Index for \"str1\" contains \"c\"", false, alist.contains("c")); + +} else if (key.equals("blah")) { + Assert.assertEquals("Index for \"str1\" contains \"a\"", false, alist.contains("a")); + Assert.assertEquals("Index for \"str1\" contains \"b\"", false, alist.contains("b")); + Assert.assertEquals("Index for \"str1\" contains \"c\"", true, alist.contains("c")); +} + } +} + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LastMatchMapTest.java -- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LastMatchMapTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LastMatchMapTest.java new file mode 100644 index 000..c930932 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LastMatchMapTest.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright
[02/12] apex-malhar git commit: Updated algo & working on math operators
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/AverageTest.java -- diff --git a/library/src/test/java/com/datatorrent/lib/math/AverageTest.java b/library/src/test/java/com/datatorrent/lib/math/AverageTest.java index 1973bec..712bcdc 100644 --- a/library/src/test/java/com/datatorrent/lib/math/AverageTest.java +++ b/library/src/test/java/com/datatorrent/lib/math/AverageTest.java @@ -21,6 +21,8 @@ package com.datatorrent.lib.math; import org.junit.Assert; import org.junit.Test; +import com.datatorrent.common.util.Pair; + import com.datatorrent.lib.testbench.CollectorTestSink; /** @@ -96,7 +98,7 @@ public class AverageTest Assert.assertEquals("number emitted tuples", 1, averageSink.collectedTuples.size()); for (Object o : averageSink.collectedTuples) { // count is 12 - Integer val = ((Number)o).intValue(); + Number val = ((Pair)o).getFirst().intValue(); Assert.assertEquals("emitted average value was was ", new Integer(1157 / 12), val); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/ChangeAlertKeyValTest.java -- diff --git a/library/src/test/java/com/datatorrent/lib/math/ChangeAlertKeyValTest.java b/library/src/test/java/com/datatorrent/lib/math/ChangeAlertKeyValTest.java deleted file mode 100644 index 7d7842e..000 --- a/library/src/test/java/com/datatorrent/lib/math/ChangeAlertKeyValTest.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.math; - -import org.junit.Assert; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.lib.util.KeyValPair; - -/** - * - * Functional tests for {@link com.datatorrent.lib.math.ChangeAlertKeyVal}. - * - * - */ -public class ChangeAlertKeyValTest -{ - private static Logger log = LoggerFactory - .getLogger(ChangeAlertKeyValTest.class); - - /** - * Test node logic emits correct results. - */ - @Test - public void testNodeProcessing() throws Exception - { -testNodeProcessingSchema(new ChangeAlertKeyVal()); -testNodeProcessingSchema(new ChangeAlertKeyVal ()); -testNodeProcessingSchema(new ChangeAlertKeyVal ()); -testNodeProcessingSchema(new ChangeAlertKeyVal ()); -testNodeProcessingSchema(new ChangeAlertKeyVal ()); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public void testNodeProcessingSchema( - ChangeAlertKeyVal oper) - { -CollectorTestSink alertSink = new CollectorTestSink(); - -oper.alert.setSink(alertSink); -oper.setPercentThreshold(5); - -oper.beginWindow(0); -oper.data.process(new KeyValPair ("a", oper.getValue(200))); -oper.data.process(new KeyValPair ("b", oper.getValue(10))); -oper.data.process(new KeyValPair ("c", oper.getValue(100))); - -oper.data.process(new KeyValPair ("a", oper.getValue(203))); -oper.data.process(new KeyValPair ("b", oper.getValue(12))); -oper.data.process(new KeyValPair ("c", oper.getValue(101))); - -oper.data.process(new KeyValPair ("a", oper.getValue(210))); -oper.data.process(new KeyValPair ("b", oper.getValue(12))); -oper.data.process(new KeyValPair ("c", oper.getValue(102))); - -oper.data.process(new KeyValPair ("a", oper.getValue(231))); -oper.data.process(new KeyValPair ("b", oper.getValue(18))); -oper.data.process(new KeyValPair ("c", oper.getValue(103))); -oper.endWindow(); - -// One for a, Two for b -Assert.assertEquals("number emitted tuples", 3, -alertSink.collectedTuples.size()); - -double aval = 0; -double bval = 0; -log.debug("\nLogging tuples"); -for (Object o : alertSink.collectedTuples) { - KeyValPair
[11/12] apex-malhar git commit: Updated algo & working on math operators
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyValueMap.java -- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyValueMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyValueMap.java new file mode 100644 index 000..78eb6d9 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyValueMap.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.HashMap; + +import com.datatorrent.api.DefaultOutputPort; + +import com.datatorrent.lib.util.AbstractBaseFrequentKeyValueMap; + +/** + * This operator filters the incoming stream of key value pairs by finding the value or values (if there is a tie), + * for each key, that occur the fewest number of times within each window. + * Each key and its corresponding least values are emitted at the end of each window. + * + * Occurrences of all values for each key is counted and at the end of window the least frequent values are emitted on output port least per key. + * + * + * This module is an end of window module + * + * Ports: + * data: expects MapK,V + * least: Output port, emits HashMapK,HashMapV,Integer(1) + * + * Properties: None + * + * Compile time checks: None + * Specific run time checks: None + * + * Benchmarks: Blast as many tuples as possible in inline mode + * + * In-BoundOut-boundComments + * 30 Million K,V pairs/sEmits only 1 tuple per window per keyIn-bound throughput is the main determinant of performance. + * The benchmark was done with immutable objects. If K or V are mutable the benchmark may be lower + * + * + * + * Function Table (K=String,V=Integer);: + * + * Tuple Type (api)In-bound (process)Out-bound (emit) + * data(MapK,V)least(HashMapK,HashMapInteger) + * Begin Window (beginWindow())N/AN/A + * Data (process()){a=1,b=5,c=110} + * Data (process()){a=55,c=2000,b=45} + * Data (process()){d=2} + * Data (process()){a=55,b=5,c=22} + * Data (process()){h=20,a=2,z=5} + * Data (process()){a=4,c=110} + * Data (process()){a=4,z=5} + * End Window (endWindow())N/A{a={1=1,2=1},b={45=1},c={2000=1,22=1},d={2=1},h={20=1},z={5=2} + * + * + * + * + * + * @displayName Emit Least Frequent Keyval Pair + * @category Rules and Alerts + * @tags filter, key value, count + * @deprecated + * @since 0.3.2 + */ +@Deprecated +public class LeastFrequentKeyValueMapextends AbstractBaseFrequentKeyValueMap +{ + /** + * The output port on which the least frequent key value pairs are emitted. + */ + public final transient DefaultOutputPort >> least = new DefaultOutputPort >>(); + + /** + * returns val1 < val2 + * @param val1 + * @param val2 + * @return val1 < val2 + */ + @Override + public boolean compareValue(int val1, int val2) + { +return (val1 < val2); + } + + /** + * Emits tuple on port "least" + * @param tuple + */ + @Override + public void emitTuple(HashMap > tuple) + { +least.emit(tuple); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyMap.java -- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyMap.java new file mode 100644 index 000..f1ab968 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyMap.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the +
[05/12] apex-malhar git commit: Updated algo & working on math operators
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java -- diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java deleted file mode 100644 index b0d2e77..000 --- a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.math; - -import java.util.HashMap; - -import javax.validation.constraints.Min; - -import org.apache.commons.lang.mutable.MutableDouble; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.lib.util.BaseNumberKeyValueOperator; -import com.datatorrent.lib.util.KeyValPair; - -/** - * Operator compares consecutive values arriving at input port mapped by keys, emits key,percent change pair on output alert port if percent change exceeds percentage threshold set in operator. - * - * StateFull : Yes, current key/value is stored in operator for comparison in - * next successive windows. - * Partition(s): No, base comparison value will be inconsistent across - * instantiated copies. - * - * Ports: - * data: expects KeyValPairK,V extends Number - * alert: emits KeyValPairK,KeyValPairV,Double(1) - * - * Properties: - * threshold: The threshold of change between consecutive tuples of the - * same key that triggers an alert tuple - * inverse: if set to true the key in the filter will block tuple - * filterBy: List of keys to filter on - * @displayName Change Alert Key Value - * @category Rules and Alerts - * @tags change, key value, numeric, percentage - * @since 0.3.3 - */ -public class ChangeAlertKeyValextends -BaseNumberKeyValueOperator -{ - /** - * Base map is a StateFull field. It is retained across windows - */ - private HashMap basemap = new HashMap (); - - /** - * Input data port that takes a key value pair. - */ - public final transient DefaultInputPort > data = new DefaultInputPort >() - { -/** - * Process each key, compute change or percent, and emit it. - */ -@Override -public void process(KeyValPair tuple) -{ - K key = tuple.getKey(); - double tval = tuple.getValue().doubleValue(); - MutableDouble val = basemap.get(key); - if (!doprocessKey(key)) { -return; - } - if (val == null) { // Only process keys that are in the basemap -val = new MutableDouble(tval); -basemap.put(cloneKey(key), val); -return; - } - double change = tval - val.doubleValue(); - double percent = (change / val.doubleValue()) * 100; - if (percent < 0.0) { -percent = 0.0 - percent; - } - if (percent > percentThreshold) { -KeyValPair dmap = new KeyValPair ( -cloneValue(tuple.getValue()), percent); -KeyValPair > otuple = new KeyValPair >( -cloneKey(key), dmap); -alert.emit(otuple); - } - val.setValue(tval); -} - }; - - /** - * Key,Percent Change output port. - */ - public final transient DefaultOutputPort >> alert = new DefaultOutputPort >>(); - - /** - * Alert thresh hold percentage set by application. - */ - @Min(1) - private double percentThreshold = 0.0; - - /** - * getter function for threshold value - * - * @return threshold value - */ - @Min(1) - public double getPercentThreshold() - { -return percentThreshold; - } - - /** - * setter function for threshold value - */ - public void setPercentThreshold(double d) - { -percentThreshold = d; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java
[09/12] apex-malhar git commit: Updated algo & working on math operators
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/CountFunction.java -- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/CountFunction.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/CountFunction.java new file mode 100644 index 000..ac83885 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/CountFunction.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.function; + +import java.util.ArrayList; +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import org.apache.commons.lang.StringUtils; + +/** + * An implementation of function index that implements sql count function semantic. + * + * Counts number of values of given column and returns count of non null values in column. + * e.g : sql => SELECT COUNT(column_name) FROM table_name. + * + *Properties : + *column : column name for values count. + *alias : Alias name for aggregate output. + * @displayName Count Function + * @category Stream Manipulators + * @tags sql count + * @since 0.3.4 + * @deprecated + */ +@Deprecated +public class CountFunction extends FunctionIndex +{ + /** + * @param column column for values count, must be non null. + * @param alias Alias name for aggregate output. + */ + public CountFunction(@NotNull String column, String alias) + { +super(column, alias); + } + + /** + * Count number of values of given column. + * @return Count of non null values in column. + */ + @Override + public Object compute(ArrayList
[12/12] apex-malhar git commit: Updated algo & working on math operators
Updated algo & working on math operators Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/8f00cefa Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/8f00cefa Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/8f00cefa Branch: refs/heads/master Commit: 8f00cefa2a14756a65e0baad48db0d52e2fe66a3 Parents: 2e47b4c Author: Lakshmi Prasanna VelineniAuthored: Wed Aug 24 20:54:53 2016 -0700 Committer: Lakshmi Prasanna Velineni Committed: Thu Sep 1 09:12:43 2016 -0700 -- .../datatorrent/apps/logstream/Application.java | 5 +- .../contrib/sqlite/SqliteStreamOperator.java| 4 +- .../misc/algo/AbstractStreamPatternMatcher.java | 174 +++ .../contrib/misc/algo/AllAfterMatchMap.java | 120 .../malhar/contrib/misc/algo/DistinctMap.java | 113 +++ .../malhar/contrib/misc/algo/FilterKeyVals.java | 163 ++ .../contrib/misc/algo/FilterKeysHashMap.java| 184 +++ .../malhar/contrib/misc/algo/FilterKeysMap.java | 196 .../malhar/contrib/misc/algo/FirstMatchMap.java | 117 +++ .../apex/malhar/contrib/misc/algo/FirstN.java | 113 +++ .../contrib/misc/algo/FirstTillMatch.java | 116 +++ .../contrib/misc/algo/InsertSortDesc.java | 136 + .../malhar/contrib/misc/algo/InvertIndex.java | 146 + .../contrib/misc/algo/InvertIndexArray.java | 130 .../malhar/contrib/misc/algo/LastMatchMap.java | 112 +++ .../contrib/misc/algo/LeastFrequentKeyMap.java | 149 + .../misc/algo/LeastFrequentKeyValueMap.java | 107 +++ .../contrib/misc/algo/MostFrequentKeyMap.java | 142 + .../misc/algo/MostFrequentKeyValueMap.java | 110 +++ .../apex/malhar/contrib/misc/algo/Sampler.java | 121 .../apex/malhar/contrib/misc/math/Change.java | 119 .../malhar/contrib/misc/math/ChangeAlert.java | 120 .../contrib/misc/math/ChangeAlertKeyVal.java| 131 .../contrib/misc/math/ChangeAlertMap.java | 125 .../malhar/contrib/misc/math/ChangeKeyVal.java | 125 .../contrib/misc/math/CompareExceptMap.java | 131 .../malhar/contrib/misc/math/CompareMap.java| 88 ++ .../malhar/contrib/misc/math/CountKeyVal.java | 116 +++ .../malhar/contrib/misc/math/ExceptMap.java | 104 +++ .../apex/malhar/contrib/misc/math/Quotient.java | 111 +++ .../malhar/contrib/misc/math/QuotientMap.java | 239 +++ .../malhar/contrib/misc/math/SumCountMap.java | 305 +++ .../streamquery/AbstractSqlStreamOperator.java | 192 .../misc/streamquery/DeleteOperator.java| 88 ++ .../streamquery/DerbySqlStreamOperator.java | 200 .../misc/streamquery/GroupByHavingOperator.java | 230 ++ .../misc/streamquery/InnerJoinOperator.java | 212 + .../misc/streamquery/OrderByOperator.java | 181 +++ .../contrib/misc/streamquery/OrderByRule.java | 99 ++ .../misc/streamquery/OuterJoinOperator.java | 123 .../streamquery/SelectFunctionOperator.java | 129 .../misc/streamquery/SelectOperator.java| 113 +++ .../misc/streamquery/SelectTopOperator.java | 131 .../misc/streamquery/UpdateOperator.java| 111 +++ .../streamquery/condition/BetweenCondition.java | 107 +++ .../condition/CompoundCondition.java| 132 .../condition/EqualValueCondition.java | 99 ++ .../condition/HavingCompareValue.java | 79 + .../streamquery/condition/HavingCondition.java | 58 .../misc/streamquery/condition/InCondition.java | 94 ++ .../streamquery/condition/LikeCondition.java| 105 +++ .../streamquery/function/AverageFunction.java | 82 + .../streamquery/function/CountFunction.java | 87 ++ .../streamquery/function/FirstLastFunction.java | 113 +++ .../streamquery/function/FunctionIndex.java | 95 ++ .../streamquery/function/MaxMinFunction.java| 105 +++ .../misc/streamquery/function/SumFunction.java | 64 .../streamquery/index/BinaryExpression.java | 75 + .../misc/streamquery/index/MidIndex.java| 82 + .../streamquery/index/NegateExpression.java | 61 .../streamquery/index/RoundDoubleIndex.java | 64 .../misc/streamquery/index/StringCaseIndex.java | 66 .../misc/streamquery/index/StringLenIndex.java | 60 .../misc/streamquery/index/SumExpression.java | 65 .../misc/streamquery/index/UnaryExpression.java | 78 + .../contrib/misc/streamquery/package-info.java | 23 ++ .../algo/AbstractStreamPatternMatcherTest.java | 173 +++