[storm] branch master updated (1453009 -> ab0b750)
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/storm.git. from 1453009 STORM-2720 : Add TIMESTAMP option for FirstPollOffset for Kafka Trident spout (#2911) add 9711d9a STORM-3392 Topology page should show components even if workers haven't started new ab0b750 Merge branch 'YSTORM-5412' of https://github.com/dandsager1/storm into STORM-3392-merge The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../org/apache/storm/daemon/nimbus/Nimbus.java | 84 ++ 1 file changed, 84 insertions(+)
[storm] 01/01: Merge branch 'YSTORM-5412' of https://github.com/dandsager1/storm into STORM-3392-merge
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git commit ab0b75084dc70232efd5d67a4ec8465902e34228 Merge: 1453009 9711d9a Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Fri May 17 23:14:47 2019 +0900 Merge branch 'YSTORM-5412' of https://github.com/dandsager1/storm into STORM-3392-merge .../org/apache/storm/daemon/nimbus/Nimbus.java | 84 ++ 1 file changed, 84 insertions(+)
[storm] branch master updated: STORM-2720 : Add TIMESTAMP option for FirstPollOffset for Kafka Trident spout (#2911)
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git The following commit(s) were added to refs/heads/master by this push: new 1453009 STORM-2720 : Add TIMESTAMP option for FirstPollOffset for Kafka Trident spout (#2911) 1453009 is described below commit 14530099b2accd462f3af815eee82d3d1b407c0a Author: Janith AuthorDate: Fri May 17 05:42:01 2019 -0700 STORM-2720 : Add TIMESTAMP option for FirstPollOffset for Kafka Trident spout (#2911) --- .../storm/kafka/spout/FirstPollOffsetStrategy.java | 12 +++- .../spout/internal/CommonKafkaSpoutConfig.java | 22 ++- .../spout/trident/KafkaTridentSpoutEmitter.java| 24 +++ .../trident/KafkaTridentSpoutEmitterEmitTest.java | 73 ++ 4 files changed, 116 insertions(+), 15 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/FirstPollOffsetStrategy.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/FirstPollOffsetStrategy.java index f2e14cb..9184bc3 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/FirstPollOffsetStrategy.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/FirstPollOffsetStrategy.java @@ -32,11 +32,21 @@ public enum FirstPollOffsetStrategy { */ LATEST, /** + * The kafka spout polls records starting at the earliest offset whose timestamp is greater than or equal to the given startTimestamp. + * This setting only takes effect on topology deployment. This option is currently available only for the Trident Spout + */ +TIMESTAMP, +/** * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as EARLIEST */ UNCOMMITTED_EARLIEST, /** * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as LATEST */ -UNCOMMITTED_LATEST; +UNCOMMITTED_LATEST, +/** + * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as TIMESTAMP. + * This option is currently available only for the Trident Spout + */ +UNCOMMITTED_TIMESTAMP; } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommonKafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommonKafkaSpoutConfig.java index 4e1160f..14c6a02 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommonKafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommonKafkaSpoutConfig.java @@ -45,7 +45,8 @@ import org.slf4j.LoggerFactory; public abstract class CommonKafkaSpoutConfig implements Serializable { public static final long DEFAULT_POLL_TIMEOUT_MS = 200; public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000; - +// Earliest start +public static final long DEFAULT_START_TS = 0L; public static final FirstPollOffsetStrategy DEFAULT_FIRST_POLL_OFFSET_STRATEGY = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; public static final Logger LOG = LoggerFactory.getLogger(CommonKafkaSpoutConfig.class); @@ -60,7 +61,8 @@ public abstract class CommonKafkaSpoutConfig implements Serializable { private final RecordTranslator translator; private final FirstPollOffsetStrategy firstPollOffsetStrategy; private final long partitionRefreshPeriodMs; - +private final long startTimeStamp; + /** * Creates a new CommonKafkaSpoutConfig using a Builder. * @@ -74,6 +76,7 @@ public abstract class CommonKafkaSpoutConfig implements Serializable { this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy; this.pollTimeoutMs = builder.pollTimeoutMs; this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs; +this.startTimeStamp = builder.startTimeStamp; } public abstract static class Builder> { @@ -85,6 +88,7 @@ public abstract class CommonKafkaSpoutConfig implements Serializable { private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS; private FirstPollOffsetStrategy firstPollOffsetStrategy = DEFAULT_FIRST_POLL_OFFSET_STRATEGY; private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS; +private long startTimeStamp = DEFAULT_START_TS; public Builder(String bootstrapServers, String... topics) { this(bootstrapServers, new NamedTopicFilter(topics), new RoundRobinManualPartitioner()); @@ -207,6 +211,15 @@ public abstract class CommonKafkaSpoutConfig implements Serializable { this.partitionRefreshPeriodMs = partitionRefreshPeriodMs; return
[storm] branch master updated: [STORM-3385] Avoid two threads reading from the same input stream of a process
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git The following commit(s) were added to refs/heads/master by this push: new eacd235 [STORM-3385] Avoid two threads reading from the same input stream of a process new 50ae20d Merge branch 'STORM-3385' of https://github.com/Ethanlm/storm into STORM-3385-merge eacd235 is described below commit eacd235639da54810060fe61c5cd4904c2ebd3a9 Author: Ethan Li AuthorDate: Wed May 1 09:29:44 2019 -0500 [STORM-3385] Avoid two threads reading from the same input stream of a process --- .../jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java index c8192d3..8ee8b63 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java @@ -64,7 +64,7 @@ public class ClientSupervisorUtils { final Map environment, final String logPreFix) throws IOException { int ret = 0; -Process process = processLauncher(conf, user, null, args, environment, logPreFix, null, null); +Process process = processLauncher(conf, user, null, args, environment, null, null, null); if (StringUtils.isNotBlank(logPreFix)) { Utils.readAndLogStream(logPreFix, process.getInputStream()); }
[storm] branch master updated (fabfcd6 -> 8951e7a)
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/storm.git. from fabfcd6 Merge branch 'STORM-3371' of https://github.com/srdo/storm into STORM-3371-merge add c884fe0 STORM-3373: Use Log4j BOM, align SLF4j dependencies, upgrade SLF4j new 8951e7a Merge branch 'STORM-3373' of https://github.com/srdo/storm into STORM-3373-merge The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: pom.xml | 23 +++ 1 file changed, 7 insertions(+), 16 deletions(-)
[storm] 01/01: Merge branch 'STORM-3373' of https://github.com/srdo/storm into STORM-3373-merge
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git commit 8951e7a89344100be6a6845b3cf500967f1bb930 Merge: fabfcd6 c884fe0 Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Sun May 5 18:06:48 2019 +0900 Merge branch 'STORM-3373' of https://github.com/srdo/storm into STORM-3373-merge pom.xml | 23 +++ 1 file changed, 7 insertions(+), 16 deletions(-) diff --cc pom.xml index 27fcc6c,3e16208..0330bc0 --- a/pom.xml +++ b/pom.xml @@@ -281,12 -280,11 +281,11 @@@ 1.0-rc4 4.1.30.Final 1.0.2 - 1.6.6 - 2.11.1 - 1.7.21 + 2.11.2 + 1.7.26 3.2.6 2.19.0 -3.4.6 +3.4.14 0.9.94 2.3.4 2.8.5
[storm] 01/01: Merge branch 'STORM-3371' of https://github.com/srdo/storm into STORM-3371-merge
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git commit fabfcd63792cddcf171cee15484b033c4c301bfe Merge: 48fb1e4 52d2b3b Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Sun May 5 18:05:47 2019 +0900 Merge branch 'STORM-3371' of https://github.com/srdo/storm into STORM-3371-merge .../jvm/org/apache/storm/task/IMetricsContext.java | 27 +++ .../jvm/org/apache/storm/task/TopologyContext.java | 7 - .../trident/operation/TridentOperationContext.java | 30 ++ 3 files changed, 63 insertions(+), 1 deletion(-)
[storm] branch master updated (48fb1e4 -> fabfcd6)
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/storm.git. from 48fb1e4 Merge branch 'STORM-3370' of https://github.com/srdo/storm into STORM-3370-merge add 52d2b3b STORM-3371: Add metrics v2 hooks to IMetricsContext new fabfcd6 Merge branch 'STORM-3371' of https://github.com/srdo/storm into STORM-3371-merge The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../jvm/org/apache/storm/task/IMetricsContext.java | 27 +++ .../jvm/org/apache/storm/task/TopologyContext.java | 7 - .../trident/operation/TridentOperationContext.java | 30 ++ 3 files changed, 63 insertions(+), 1 deletion(-)
[storm] 01/01: Merge branch 'STORM-3370' of https://github.com/srdo/storm into STORM-3370-merge
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git commit 48fb1e4af7a21fc15c5381f4c367435e7f181df8 Merge: 1c94048 74d4bc2 Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Sun May 5 18:04:30 2019 +0900 Merge branch 'STORM-3370' of https://github.com/srdo/storm into STORM-3370-merge .../org/apache/storm/perf/JCQueuePerfTest.java | 25 +--- .../hdfs/bolt/format/TestSimpleFileNameFormat.java | 2 +- .../org/apache/storm/hdfs/spout/TestHdfsSpout.java | 7 +-- .../src/jvm/org/apache/storm/daemon/Task.java | 34 +-- .../jvm/org/apache/storm/daemon/worker/Worker.java | 9 +-- .../apache/storm/daemon/worker/WorkerState.java| 15 - .../apache/storm/daemon/worker/WorkerTransfer.java | 9 +-- .../apache/storm/metrics2/StormMetricRegistry.java | 66 ++ .../jvm/org/apache/storm/metrics2/TaskMetrics.java | 28 - .../jvm/org/apache/storm/task/TopologyContext.java | 31 +- .../src/jvm/org/apache/storm/utils/JCQueue.java| 4 +- .../storm/utils/JCQueueBackpressureTest.java | 3 +- .../jvm/org/apache/storm/utils/JCQueueTest.java| 3 +- .../src/main/java/org/apache/storm/Testing.java| 33 +-- 14 files changed, 142 insertions(+), 127 deletions(-)
[storm] branch master updated (1c94048 -> 48fb1e4)
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/storm.git. from 1c94048 Merge branch 'STORM-3386' of https://github.com/srdo/storm into asfgit-master add 74d4bc2 STORM-3370: Make StormMetricRegistry non-static, and supply it as a dependendency where it is used new 48fb1e4 Merge branch 'STORM-3370' of https://github.com/srdo/storm into STORM-3370-merge The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../org/apache/storm/perf/JCQueuePerfTest.java | 25 +--- .../hdfs/bolt/format/TestSimpleFileNameFormat.java | 2 +- .../org/apache/storm/hdfs/spout/TestHdfsSpout.java | 7 +-- .../src/jvm/org/apache/storm/daemon/Task.java | 34 +-- .../jvm/org/apache/storm/daemon/worker/Worker.java | 9 +-- .../apache/storm/daemon/worker/WorkerState.java| 15 - .../apache/storm/daemon/worker/WorkerTransfer.java | 9 +-- .../apache/storm/metrics2/StormMetricRegistry.java | 66 ++ .../jvm/org/apache/storm/metrics2/TaskMetrics.java | 28 - .../jvm/org/apache/storm/task/TopologyContext.java | 31 +- .../src/jvm/org/apache/storm/utils/JCQueue.java| 4 +- .../storm/utils/JCQueueBackpressureTest.java | 3 +- .../jvm/org/apache/storm/utils/JCQueueTest.java| 3 +- .../src/main/java/org/apache/storm/Testing.java| 33 +-- 14 files changed, 142 insertions(+), 127 deletions(-)
[storm] branch master updated: STORM-3362 fixing 'EventHubSpout uses a blocking receiver in nextTuple()' issue
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git The following commit(s) were added to refs/heads/master by this push: new ec0a99a STORM-3362 fixing 'EventHubSpout uses a blocking receiver in nextTuple()' issue new c4aa01a Merge branch 'fixing_blocking_call_in_EventHubSpout' of https://github.com/CaperAi/storm into STORM-3362-merge ec0a99a is described below commit ec0a99a2fd747c0fe53f8dc7acb6c229e0e44d44 Author: York Yang AuthorDate: Tue Mar 26 10:19:21 2019 -0400 STORM-3362 fixing 'EventHubSpout uses a blocking receiver in nextTuple()' issue --- .../org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java | 6 ++ .../org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java| 9 + 2 files changed, 15 insertions(+) diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java index 83dc850..c5d6303 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java @@ -23,6 +23,7 @@ import com.microsoft.azure.eventhubs.EventHubClient; import com.microsoft.azure.eventhubs.PartitionReceiver; import com.microsoft.azure.servicebus.ServiceBusException; import java.io.IOException; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -39,6 +40,7 @@ public class EventHubReceiverImpl implements IEventHubReceiver { private final String entityName; private final String partitionId; private final String consumerGroupName; +private final int receiverTimeoutInMillis; private PartitionReceiver receiver; private EventHubClient ehClient; @@ -51,6 +53,7 @@ public class EventHubReceiverImpl implements IEventHubReceiver { this.entityName = config.getEntityPath(); this.partitionId = partitionId; this.consumerGroupName = config.getConsumerGroupName(); +this.receiverTimeoutInMillis = config.getReceiverTimeoutInMillis(); receiveApiLatencyMean = new ReducedMetric(new MeanReducer()); receiveApiCallCount = new CountMetric(); receiveMessageCount = new CountMetric(); @@ -82,6 +85,9 @@ public class EventHubReceiverImpl implements IEventHubReceiver { throw new RuntimeException("Eventhub receiver must have " + "an offset or time to be created"); } +if (receiver != null) { + receiver.setReceiveTimeout(Duration.ofMillis(receiverTimeoutInMillis)); +} } catch (IOException e) { logger.error("Exception in creating ehclient" + e.toString()); throw new EventHubException(e); diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java index cd27b11..3e486b5 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java @@ -42,6 +42,7 @@ public class EventHubSpoutConfig implements Serializable { // disabling filter private String connectionString; private String topologyName; +private int receiverTimeoutInMillis = 1000; // default private IEventDataScheme scheme = new StringEventDataScheme(); private String consumerGroupName = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME; private String outputStreamId; @@ -252,4 +253,12 @@ public class EventHubSpoutConfig implements Serializable { public void setOutputStreamId(String outputStreamId) { this.outputStreamId = outputStreamId; } + +public int getReceiverTimeoutInMillis() { +return receiverTimeoutInMillis; +} + +public void setReceiverTimeoutInMillis(int receiverTimeoutInMillis) { +this.receiverTimeoutInMillis = receiverTimeoutInMillis; +} }
[storm] branch master updated: STORM-3354: Fix Nimbus not reentering leadership election in some cases, and quit leadership election cleanly when Nimbus shuts down
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git The following commit(s) were added to refs/heads/master by this push: new 27921e2 STORM-3354: Fix Nimbus not reentering leadership election in some cases, and quit leadership election cleanly when Nimbus shuts down new 7e98f29 Merge branch 'STORM-3354' of https://github.com/srdo/storm into STORM-3354-merge 27921e2 is described below commit 27921e208a1373f4f3d8b1639693510e23cb593b Author: Stig Rohde Døssing AuthorDate: Thu Mar 7 12:36:03 2019 +0100 STORM-3354: Fix Nimbus not reentering leadership election in some cases, and quit leadership election cleanly when Nimbus shuts down --- .../storm/messaging/netty/StormServerHandler.java | 2 +- .../org/apache/storm/nimbus/ILeaderElector.java| 17 +++-- .../src/jvm/org/apache/storm/utils/Utils.java | 27 .../apache/storm/testing/MockLeaderElector.java| 2 +- .../apache/storm/blobstore/LocalFsBlobStore.java | 2 +- .../org/apache/storm/daemon/nimbus/Nimbus.java | 6 +- .../storm/nimbus/LeaderListenerCallback.java | 26 --- .../apache/storm/zookeeper/LeaderElectorImp.java | 77 +--- .../zookeeper/LeaderListenerCallbackFactory.java | 81 ++ .../java/org/apache/storm/zookeeper/Zookeeper.java | 45 ++-- 10 files changed, 162 insertions(+), 123 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java index 55e7058..3c256bb 100644 --- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java +++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java @@ -26,7 +26,7 @@ import org.slf4j.LoggerFactory; public class StormServerHandler extends ChannelInboundHandlerAdapter { private static final Logger LOG = LoggerFactory.getLogger(StormServerHandler.class); -private static final Set ALLOWED_EXCEPTIONS = new HashSet<>(Arrays.asList(new Class[]{ IOException.class })); +private static final Set> ALLOWED_EXCEPTIONS = new HashSet<>(Arrays.asList(new Class[]{ IOException.class })); private final IServer server; private final AtomicInteger failure_count; diff --git a/storm-client/src/jvm/org/apache/storm/nimbus/ILeaderElector.java b/storm-client/src/jvm/org/apache/storm/nimbus/ILeaderElector.java index 3fdb407..f0d877f 100644 --- a/storm-client/src/jvm/org/apache/storm/nimbus/ILeaderElector.java +++ b/storm-client/src/jvm/org/apache/storm/nimbus/ILeaderElector.java @@ -12,7 +12,6 @@ package org.apache.storm.nimbus; -import java.io.Closeable; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -21,7 +20,7 @@ import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting; /** * The interface for leader election. */ -public interface ILeaderElector extends Closeable { +public interface ILeaderElector extends AutoCloseable { /** * Method guaranteed to be called as part of initialization of leader elector instance. @@ -37,11 +36,11 @@ public interface ILeaderElector extends Closeable { void addToLeaderLockQueue() throws Exception; /** - * Removes the caller from the leader lock queue. If the caller is leader - * also releases the lock. This method can be called multiple times so it needs - * to be idempotent. + * Removes the caller from leadership election, relinquishing leadership if acquired, then requeues for leadership after the specified + * delay. + * @param delayMs The delay to wait before re-entering the election */ -void removeFromLeaderLockQueue() throws Exception; +void quitElectionFor(int delayMs) throws Exception; /** * Decide if the caller currently has the leader lock. @@ -51,7 +50,7 @@ public interface ILeaderElector extends Closeable { /** * Get the current leader's address. - * @return the current leader's address , may return null if no one has the lock. + * @return the current leader's address, may return null if no one has the lock. */ NimbusInfo getLeader(); @@ -71,9 +70,9 @@ public interface ILeaderElector extends Closeable { List getAllNimbuses() throws Exception; /** - * Method called to allow for cleanup. once close this object can not be reused. + * Method called to allow for cleanup. Relinquishes leadership if owned by the caller. */ @Override -void close(); +void close() throws Exception; } diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java index 102720f..8d15fc2 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java +++ b/storm-client/src/jvm/o
[storm] branch master updated: STORM-3357: Bump Clojure version to 1.10
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git The following commit(s) were added to refs/heads/master by this push: new 49cf9cc STORM-3357: Bump Clojure version to 1.10 new 0ea3c89 Merge branch 'STORM-3357' of https://github.com/srdo/storm into STORM-3357-merge 49cf9cc is described below commit 49cf9ccd087fd81e563172a9c695b0a409dcdf64 Author: Stig Rohde Døssing AuthorDate: Wed Mar 13 18:19:11 2019 +0100 STORM-3357: Bump Clojure version to 1.10 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index fefd3e7..25fca9c 100644 --- a/pom.xml +++ b/pom.xml @@ -257,7 +257,7 @@ true -1.7.0 +1.10.0 1.18 2.6 2.6
[storm] branch master updated: STORM-3356: Exclude slf4j-binding from Hive dependencies
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git The following commit(s) were added to refs/heads/master by this push: new 2cfbe07 STORM-3356: Exclude slf4j-binding from Hive dependencies new 8848bb8 Merge branch 'STORM-3356' of https://github.com/srdo/storm into STORM-3356-merge 2cfbe07 is described below commit 2cfbe073848ef7074a26f0929c934a4f283793fc Author: Stig Rohde Døssing AuthorDate: Tue Mar 12 12:43:48 2019 +0100 STORM-3356: Exclude slf4j-binding from Hive dependencies --- external/storm-autocreds/pom.xml | 8 external/storm-hive/pom.xml | 12 2 files changed, 20 insertions(+) diff --git a/external/storm-autocreds/pom.xml b/external/storm-autocreds/pom.xml index d819145..1851256 100644 --- a/external/storm-autocreds/pom.xml +++ b/external/storm-autocreds/pom.xml @@ -133,6 +133,10 @@ slf4j-log4j12 +org.apache.logging.log4j +log4j-slf4j-impl + + org.apache.calcite calcite-core @@ -156,6 +160,10 @@ slf4j-log4j12 +org.apache.logging.log4j +log4j-slf4j-impl + + org.apache.calcite calcite-avatica diff --git a/external/storm-hive/pom.xml b/external/storm-hive/pom.xml index 2fd8ae5..424fe53 100644 --- a/external/storm-hive/pom.xml +++ b/external/storm-hive/pom.xml @@ -72,6 +72,10 @@ slf4j-log4j12 +org.apache.logging.log4j +log4j-slf4j-impl + + org.apache.calcite calcite-core @@ -92,6 +96,10 @@ slf4j-log4j12 +org.apache.logging.log4j +log4j-slf4j-impl + + org.apache.calcite calcite-avatica @@ -111,6 +119,10 @@ slf4j-log4j12 +org.apache.logging.log4j +log4j-slf4j-impl + + org.apache.calcite calcite-core
[storm] branch master updated: STORM-3352: Lock Netty versions using Netty BOM
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git The following commit(s) were added to refs/heads/master by this push: new d4b53b7 STORM-3352: Lock Netty versions using Netty BOM new f335cce Merge branch 'STORM-3352' of https://github.com/srdo/storm into STORM-3352-merge d4b53b7 is described below commit d4b53b7d41a4b55975c48b5412581de74b9bd80c Author: Stig Rohde Døssing AuthorDate: Thu Mar 7 08:48:12 2019 +0100 STORM-3352: Lock Netty versions using Netty BOM --- pom.xml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 54a7520..5d61c92 100644 --- a/pom.xml +++ b/pom.xml @@ -946,8 +946,10 @@ io.netty -netty-all +netty-bom ${netty.version} +pom +import io.dropwizard.metrics
[storm] branch 1.1.x-branch updated: [MINOR] Update maven repository URL to https
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch 1.1.x-branch in repository https://gitbox.apache.org/repos/asf/storm.git The following commit(s) were added to refs/heads/1.1.x-branch by this push: new 8365e34 [MINOR] Update maven repository URL to https 8365e34 is described below commit 8365e34dbef5ccc2913874704812bc68196f4752 Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Mon Mar 11 23:00:02 2019 +0900 [MINOR] Update maven repository URL to https --- .../src/main/java/org/apache/storm/submit/dependency/Booter.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/Booter.java b/external/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/Booter.java index 31a6c50..bce9ba9 100644 --- a/external/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/Booter.java +++ b/external/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/Booter.java @@ -47,7 +47,7 @@ public class Booter { } public static RemoteRepository newCentralRepository() { -return new RemoteRepository.Builder("central", "default", "http://repo1.maven.org/maven2/;).build(); +return new RemoteRepository.Builder("central", "default", "https://repo1.maven.org/maven2/;).build(); } public static RemoteRepository newLocalRepository() {
[storm] branch 1.x-branch updated: [MINOR] Update maven repository URL to https
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch 1.x-branch in repository https://gitbox.apache.org/repos/asf/storm.git The following commit(s) were added to refs/heads/1.x-branch by this push: new 79cceae [MINOR] Update maven repository URL to https new 2ade130 Merge branch '1.x-minor-update-maven-repository-to-https' into 1.x-branch 79cceae is described below commit 79cceae3d70643d48bb4b28d31d743ed9794f856 Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Mon Mar 11 23:00:02 2019 +0900 [MINOR] Update maven repository URL to https --- .../src/main/java/org/apache/storm/submit/dependency/Booter.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/Booter.java b/external/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/Booter.java index 31a6c50..bce9ba9 100644 --- a/external/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/Booter.java +++ b/external/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/Booter.java @@ -47,7 +47,7 @@ public class Booter { } public static RemoteRepository newCentralRepository() { -return new RemoteRepository.Builder("central", "default", "http://repo1.maven.org/maven2/;).build(); +return new RemoteRepository.Builder("central", "default", "https://repo1.maven.org/maven2/;).build(); } public static RemoteRepository newLocalRepository() {
[storm] branch master updated: [MINOR] Update maven repository URL to https
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git The following commit(s) were added to refs/heads/master by this push: new e069c06 [MINOR] Update maven repository URL to https new ca3a17a Merge branch 'minor-update-maven-repository-to-https' e069c06 is described below commit e069c06657e60023094ee9c0540d7e10762409c4 Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Mon Mar 11 23:00:02 2019 +0900 [MINOR] Update maven repository URL to https --- .../src/main/java/org/apache/storm/submit/dependency/Booter.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/Booter.java b/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/Booter.java index a9632e8..5b60faf 100644 --- a/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/Booter.java +++ b/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/Booter.java @@ -47,7 +47,7 @@ public class Booter { } public static RemoteRepository newCentralRepository() { -return new RemoteRepository.Builder("central", "default", "http://repo1.maven.org/maven2/;).build(); +return new RemoteRepository.Builder("central", "default", "https://repo1.maven.org/maven2/;).build(); } public static RemoteRepository newLocalRepository() {
[storm] 02/04: STORM-3344 clean up test code
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git commit 8a332e382e404d1341b09f7b77c74c5bbec43610 Author: Aaron Gresch AuthorDate: Fri Mar 15 10:33:30 2019 -0500 STORM-3344 clean up test code --- .../scheduler/blacklist/BlacklistScheduler.java| 2 +- .../scheduler/blacklist/FaultGenerateUtils.java| 2 +- .../blacklist/TestBlacklistScheduler.java | 22 .../blacklist/TestUtilsForBlacklistScheduler.java | 24 -- 4 files changed, 33 insertions(+), 17 deletions(-) diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java index 1061a3f..54c10f9 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java @@ -155,7 +155,7 @@ public class BlacklistScheduler implements IScheduler { Set newPorts = Sets.difference(supervisorPorts, cachedSupervisorPorts); if (newPorts.size() > 0) { -//add new ports to cached supervisor. We need a modifiable set to allow removing ports later. +// add new ports to cached supervisor. We need a modifiable set to allow removing ports later. Set allPorts = new HashSet<>(newPorts); allPorts.addAll(cachedSupervisorPorts); cachedSupervisors.put(supervisorKey, allPorts); diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java index c962039..9a34080 100644 --- a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java +++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java @@ -43,7 +43,7 @@ public class FaultGenerateUtils { supervisors = TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supervisors, "sup-" + supervisor); } else { for (int slot : slots) { -supervisors = TestUtilsForBlacklistScheduler.modifyPortFromSupervisors(supervisors, "sup-" + supervisor, slot, false); +supervisors = TestUtilsForBlacklistScheduler.removePortFromSupervisors(supervisors, "sup-" + supervisor, slot); } } } diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java index f55cbb1..96006b4 100644 --- a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java +++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java @@ -120,9 +120,11 @@ public class TestBlacklistScheduler { scheduler.prepare(config); scheduler.schedule(topologies, cluster); -cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.modifyPortFromSupervisors(supMap, "sup-0", 0, false), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config); +cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removePortFromSupervisors(supMap, +"sup-0", 0), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config); scheduler.schedule(topologies, cluster); -cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.modifyPortFromSupervisors(supMap, "sup-0", 0, false), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config); +cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removePortFromSupervisors(supMap, "sup-0", 0), + TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config); scheduler.schedule(topologies, cluster); cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap(), topologies, config); scheduler.schedule(topologies, cluster); @@ -319,7 +321,8 @@ public class TestBlacklistScheduler { scheduler = bs; bs.prepare(config); bs.schedule(topologies,cluster); -cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap,"sup-0"),TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssi
[storm] 03/04: STORM-3344 clean up test code
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git commit 0da9f18dd479a77a9d0e3ea575c001232f91fefe Author: Aaron Gresch AuthorDate: Fri Mar 15 10:36:26 2019 -0500 STORM-3344 clean up test code --- .../org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java index 96006b4..cd8caf3 100644 --- a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java +++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java @@ -321,7 +321,7 @@ public class TestBlacklistScheduler { scheduler = bs; bs.prepare(config); bs.schedule(topologies,cluster); -cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap,"sup-0"), +cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0"), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config); for (int i = 0 ; i < 20 ; i++){ bs.schedule(topologies,cluster);
[storm] 04/04: Merge branch 'agresch_STORM-3344' of https://github.com/agresch/storm into STORM-3344-merge
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git commit 77c4e2857175cb980f984bc1dbe19751d71f9f41 Merge: 18ead35 0da9f18 Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Sat Mar 16 03:45:31 2019 +0900 Merge branch 'agresch_STORM-3344' of https://github.com/agresch/storm into STORM-3344-merge .../scheduler/blacklist/BlacklistScheduler.java| 14 +++--- .../strategies/DefaultBlacklistStrategy.java | 10 ++--- .../blacklist/TestBlacklistScheduler.java | 52 -- .../blacklist/TestUtilsForBlacklistScheduler.java | 16 +++ .../resource/TestResourceAwareScheduler.java | 2 +- 5 files changed, 79 insertions(+), 15 deletions(-)
[storm] branch master updated (18ead35 -> 77c4e28)
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/storm.git. from 18ead35 Merge branch 'STORM-3319' of https://github.com/srdo/storm into STORM-3319-merging add 8014f84 STORM-3344 prevent blacklist scheduler runtime exeception removing from set new cd10629 STORM-3344 add unit test to verify blacklist scheduler handles supervisors with changing ports new 8a332e3 STORM-3344 clean up test code new 0da9f18 STORM-3344 clean up test code new 77c4e28 Merge branch 'agresch_STORM-3344' of https://github.com/agresch/storm into STORM-3344-merge The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../scheduler/blacklist/BlacklistScheduler.java| 14 +++--- .../strategies/DefaultBlacklistStrategy.java | 10 ++--- .../blacklist/TestBlacklistScheduler.java | 52 -- .../blacklist/TestUtilsForBlacklistScheduler.java | 16 +++ .../resource/TestResourceAwareScheduler.java | 2 +- 5 files changed, 79 insertions(+), 15 deletions(-)
[storm] 01/04: STORM-3344 add unit test to verify blacklist scheduler handles supervisors with changing ports
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git commit cd1062969bace192f1ea6985cb0aea92bd5aae3f Author: Aaron Gresch AuthorDate: Wed Feb 27 08:32:26 2019 -0600 STORM-3344 add unit test to verify blacklist scheduler handles supervisors with changing ports --- .../scheduler/blacklist/BlacklistScheduler.java| 8 ++-- .../strategies/DefaultBlacklistStrategy.java | 10 ++--- .../scheduler/blacklist/FaultGenerateUtils.java| 2 +- .../blacklist/TestBlacklistScheduler.java | 46 -- .../blacklist/TestUtilsForBlacklistScheduler.java | 8 +++- .../resource/TestResourceAwareScheduler.java | 2 +- 6 files changed, 61 insertions(+), 15 deletions(-) diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java index 73bc5a0..1061a3f 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java @@ -193,9 +193,11 @@ public class BlacklistScheduler implements IScheduler { for (String supervisor : supervisors) { int supervisorCount = supervisorCountMap.getOrDefault(supervisor, 0); Set slots = item.get(supervisor); -if (slots.equals(cachedSupervisors.get(supervisor))) { // treat supervisor as bad only if all slots are bad +if (slots.equals(cachedSupervisors.get(supervisor))) { // treat supervisor as bad only if all of its slots matched the cached supervisor +// track how many times a cached supervisor has been marked bad supervisorCountMap.put(supervisor, supervisorCount + 1); } +// track how many times each supervisor slot has been listed as bad for (Integer slot : slots) { WorkerSlot workerSlot = new WorkerSlot(supervisor, slot); int slotCount = slotCountMap.getOrDefault(workerSlot, 0); @@ -217,8 +219,8 @@ public class BlacklistScheduler implements IScheduler { WorkerSlot workerSlot = entry.getKey(); String supervisorKey = workerSlot.getNodeId(); Integer slot = workerSlot.getPort(); -int value = entry.getValue(); -if (value == windowSize) { // worker slot which was never back to normal in tolerance period will be removed from cache +int slotFailures = entry.getValue(); +if (slotFailures == windowSize) { // worker slot which was never back to normal in tolerance period will be removed from cache Set slots = cachedSupervisors.get(supervisorKey); if (slots != null) { // slots will be null while supervisor has been removed from cached supervisors slots.remove(slot); diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java index 7748d6f..8a36669 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java @@ -98,12 +98,12 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy { public void resumeFromBlacklist() { Set readyToRemove = new HashSet<>(); for (Map.Entry entry : blacklist.entrySet()) { -String key = entry.getKey(); -int value = entry.getValue() - 1; -if (value == 0) { -readyToRemove.add(key); +String supervisor = entry.getKey(); +int countUntilResume = entry.getValue() - 1; +if (countUntilResume == 0) { +readyToRemove.add(supervisor); } else { -blacklist.put(key, value); +blacklist.put(supervisor, countUntilResume); } } for (String key : readyToRemove) { diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java index 9a34080..c962039 100644 --- a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java +++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java @@ -43,7 +43,7 @@ public class FaultGenerateUtils { supervisors = TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supervisors, "sup-" + supervisor);
[storm] branch master updated: STORM-3319: Fix failing assertion in Slot
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git The following commit(s) were added to refs/heads/master by this push: new 947c7b2 STORM-3319: Fix failing assertion in Slot new 18ead35 Merge branch 'STORM-3319' of https://github.com/srdo/storm into STORM-3319-merging 947c7b2 is described below commit 947c7b22acf4cdf4f95001b26e700a590d675220 Author: Stig Rohde Døssing AuthorDate: Sun Jan 20 03:15:03 2019 +0100 STORM-3319: Fix failing assertion in Slot --- .../org/apache/storm/daemon/supervisor/Slot.java | 99 +- .../org/apache/storm/localizer/AsyncLocalizer.java | 10 +-- .../apache/storm/localizer/LocalizedResource.java | 2 +- .../apache/storm/localizer/LocallyCachedBlob.java | 54 +++- .../storm/localizer/LocallyCachedTopologyBlob.java | 2 +- .../apache/storm/daemon/supervisor/SlotTest.java | 80 + .../apache/storm/localizer/AsyncLocalizerTest.java | 12 +-- 7 files changed, 198 insertions(+), 61 deletions(-) diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java index 8b1b2f5..74c3e3d 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java @@ -128,13 +128,10 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback } } catch (ContainerRecoveryException e) { //We could not recover container will be null. +currentAssignment = null; } newAssignment = currentAssignment; -if (container == null) { -currentAssignment = null; -//Assigned something but it is not running -} } } @@ -155,7 +152,9 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback //In some cases the new LocalAssignment may be equivalent to the old, but // It is not equal. In those cases we want to update the current assignment to // be the same as the new assignment +//PRECONDITION: The new and current assignments must be equivalent private static DynamicState updateAssignmentIfNeeded(DynamicState dynamicState) { +assert equivalent(dynamicState.newAssignment, dynamicState.currentAssignment); if (dynamicState.newAssignment != null && !dynamicState.newAssignment.equals(dynamicState.currentAssignment)) { dynamicState = @@ -244,7 +243,10 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback */ private static DynamicState prepareForNewAssignmentNoWorkersRunning(DynamicState dynamicState, StaticState staticState) throws IOException { assert (dynamicState.container == null); +assert dynamicState.currentAssignment == null; +//We're either going to empty, or starting fresh blob download. Either way, the changing blob notifications are outdated. +dynamicState = drainAllChangingBlobs(dynamicState); if (dynamicState.newAssignment == null) { return dynamicState.withState(MachineState.EMPTY); } @@ -276,7 +278,6 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback pendingDownload = staticState.localizer.requestDownloadTopologyBlobs( dynamicState.newAssignment, staticState.port, staticState.changingCallback); } -dynamicState = drainAllChangingBlobs(dynamicState); next = dynamicState.withState(MachineState.KILL) .withPendingLocalization(dynamicState.newAssignment, pendingDownload); break; @@ -330,10 +331,13 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback /** * Drop all of the changingBlobs and pendingChangingBlobs. + * + * PRECONDITION: container is null * @param dynamicState current state. * @return the next state. */ private static DynamicState drainAllChangingBlobs(DynamicState dynamicState) { +assert dynamicState.container == null; if (!dynamicState.changingBlobs.isEmpty()) { for (BlobChanging rc : dynamicState.changingBlobs) { rc.latch.countDown(); @@ -416,6 +420,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback assert (dynamicState.pendingLocalization != null); assert (dynamicState.pendingDownload != null); assert (dynamicState.container == null); +assert dynamicState.currentAssignment == null; //Ignore changes to scheduli
[storm] branch 1.x-branch updated: [MINOR] Update repository URLs from http to https
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch 1.x-branch in repository https://gitbox.apache.org/repos/asf/storm.git The following commit(s) were added to refs/heads/1.x-branch by this push: new b735ea0 [MINOR] Update repository URLs from http to https new bc729eb Merge branch '1.x-branch-MINOR-http-repo-to-https' into 1.x-branch b735ea0 is described below commit b735ea0c31b42f78130a90c1dec4e818d85ad124 Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Mon Feb 25 22:43:52 2019 +0900 [MINOR] Update repository URLs from http to https * confirmed modified URLs also exist --- external/storm-hdfs/pom.xml | 2 +- external/storm-mqtt/pom.xml | 2 +- pom.xml | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml index 3604929..8aae2b7 100644 --- a/external/storm-hdfs/pom.xml +++ b/external/storm-hdfs/pom.xml @@ -38,7 +38,7 @@ confluent -http://packages.confluent.io/maven +https://packages.confluent.io/maven diff --git a/external/storm-mqtt/pom.xml b/external/storm-mqtt/pom.xml index 02aabfc..fce5482 100644 --- a/external/storm-mqtt/pom.xml +++ b/external/storm-mqtt/pom.xml @@ -35,7 +35,7 @@ bintray -http://dl.bintray.com/andsel/maven/ +https://dl.bintray.com/andsel/maven/ true diff --git a/pom.xml b/pom.xml index cff8bc7..dfbc002 100644 --- a/pom.xml +++ b/pom.xml @@ -30,11 +30,11 @@ pom Storm Distributed and fault-tolerant realtime computation -http://storm.apache.org +https://storm.apache.org The Apache Software License, Version 2.0 -http://www.apache.org/licenses/LICENSE-2.0.txt +https://www.apache.org/licenses/LICENSE-2.0.txt @@ -1029,7 +1029,7 @@ false central -http://repo1.maven.org/maven2/ +https://repo1.maven.org/maven2/
[storm] branch 1.1.x-branch updated: [MINOR] Update repository URLs from http to https
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch 1.1.x-branch in repository https://gitbox.apache.org/repos/asf/storm.git The following commit(s) were added to refs/heads/1.1.x-branch by this push: new 1981c9b [MINOR] Update repository URLs from http to https new 2d3ba4c Merge branch '1.1.x-branch-MINOR-http-repo-to-https' into 1.1.x-branch 1981c9b is described below commit 1981c9b7565f00768e6447966f0fa911107c2638 Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Mon Feb 25 22:43:52 2019 +0900 [MINOR] Update repository URLs from http to https * confirmed modified URLs also exist --- external/storm-hdfs/pom.xml | 2 +- external/storm-mqtt/pom.xml | 2 +- pom.xml | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml index a3633ac..cf52e62 100644 --- a/external/storm-hdfs/pom.xml +++ b/external/storm-hdfs/pom.xml @@ -38,7 +38,7 @@ confluent -http://packages.confluent.io/maven +https://packages.confluent.io/maven diff --git a/external/storm-mqtt/pom.xml b/external/storm-mqtt/pom.xml index ed55157..7087621 100644 --- a/external/storm-mqtt/pom.xml +++ b/external/storm-mqtt/pom.xml @@ -35,7 +35,7 @@ bintray -http://dl.bintray.com/andsel/maven/ +https://dl.bintray.com/andsel/maven/ true diff --git a/pom.xml b/pom.xml index 7e08c64..1267461 100644 --- a/pom.xml +++ b/pom.xml @@ -31,11 +31,11 @@ pom Storm Distributed and fault-tolerant realtime computation -http://storm.apache.org +https://storm.apache.org The Apache Software License, Version 2.0 -http://www.apache.org/licenses/LICENSE-2.0.txt +https://www.apache.org/licenses/LICENSE-2.0.txt @@ -1016,7 +1016,7 @@ false central -http://repo1.maven.org/maven2/ +https://repo1.maven.org/maven2/
[storm] branch master updated (0eb14a5 -> 78463c3)
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/storm.git. from 0eb14a5 Merge pull request #2959 from srdo/licenses add 87f125a STORM-3320: Ensure executors start when worker connections are ready new c4898b9 Merge branch 'STORM-3320' of https://github.com/srdo/storm into STORM-3320-merge add 35b294b STORM-3343: Fix flaky JCQueue test new 78463c3 Merge branch 'STORM-3343' of https://github.com/srdo/storm into STORM-3343-merge The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../org/apache/storm/daemon/worker/WorkerState.java | 19 --- .../src/jvm/org/apache/storm/executor/Executor.java | 3 +++ .../org/apache/storm/executor/bolt/BoltExecutor.java | 4 +++- .../apache/storm/executor/spout/SpoutExecutor.java| 4 +++- .../test/jvm/org/apache/storm/utils/JCQueueTest.java | 18 ++ 5 files changed, 27 insertions(+), 21 deletions(-)
[storm] 02/02: Merge branch 'STORM-3343' of https://github.com/srdo/storm into STORM-3343-merge
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git commit 78463c3403ba76d3a67ea622aa875f8d64710ca9 Merge: c4898b9 35b294b Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Wed Feb 27 04:53:20 2019 +0900 Merge branch 'STORM-3343' of https://github.com/srdo/storm into STORM-3343-merge .../test/jvm/org/apache/storm/utils/JCQueueTest.java | 18 ++ 1 file changed, 6 insertions(+), 12 deletions(-)
[storm] 01/02: Merge branch 'STORM-3320' of https://github.com/srdo/storm into STORM-3320-merge
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git commit c4898b9c2d4ec89b1da702e5c4c2b5969065436e Merge: 0eb14a5 87f125a Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Wed Feb 27 04:52:50 2019 +0900 Merge branch 'STORM-3320' of https://github.com/srdo/storm into STORM-3320-merge .../org/apache/storm/daemon/worker/WorkerState.java | 19 --- .../src/jvm/org/apache/storm/executor/Executor.java | 3 +++ .../org/apache/storm/executor/bolt/BoltExecutor.java | 4 +++- .../apache/storm/executor/spout/SpoutExecutor.java| 4 +++- 4 files changed, 21 insertions(+), 9 deletions(-)
[storm] branch master updated: STORM-3342: Add license-maven-plugin configuration, and describe how to use it
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git The following commit(s) were added to refs/heads/master by this push: new 499ba07 STORM-3342: Add license-maven-plugin configuration, and describe how to use it new 0eb14a5 Merge pull request #2959 from srdo/licenses 499ba07 is described below commit 499ba076a9e1b2f93eda03895d761d1ff0bc6332 Author: Stig Rohde Døssing AuthorDate: Sun May 27 00:06:15 2018 +0200 STORM-3342: Add license-maven-plugin configuration, and describe how to use it --- DEVELOPER.md | 7 +++- THIRD-PARTY.properties | 41 pom.xml| 87 ++ 3 files changed, 134 insertions(+), 1 deletion(-) diff --git a/DEVELOPER.md b/DEVELOPER.md index 07add43..6abd622 100644 --- a/DEVELOPER.md +++ b/DEVELOPER.md @@ -306,9 +306,14 @@ You can also run tests selectively with `-Dtest=`. This works for bo Unfortunately you might experience failures in clojure tests which are wrapped in the `maven-clojure-plugin` and thus doesn't provide too much useful output at first sight - you might end up with a maven test failure with an error message as unhelpful as `Clojure failed.`. In this case it's recommended to look into `target/test-reports` of the failed project to see what actual tests have failed or scroll through the maven output looking for obvious issues like missing binaries. -By default integration tests are not run in the test phase. To run Java and Clojure integration tests you must enable the profile +By default integration tests are not run in the test phase. To run Java and Clojure integration tests you must enable the profile `integration-tests-only`, or `all-tests`. +## Listing dependency licenses +You can generate a list of dependencies and their licenses by running `mvn generate-resources -Dlicense.skipAggregateAddThirdParty=false` in the project root. +The list will be put in target/generated-sources/license/THIRD-PARTY.txt. + +The license aggregation plugin will use the license listed in a dependency's POM. If the license is missing, or incomplete (e.g. due to multiple licenses), you can override the license by describing the dependency in the THIRD-PARTY.properties file in the project root. diff --git a/THIRD-PARTY.properties b/THIRD-PARTY.properties new file mode 100644 index 000..2f0f2af --- /dev/null +++ b/THIRD-PARTY.properties @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +classworlds--classworlds--1.1-alpha-2=Apache License version 2.0 +com.twitter--carbonite--1.5.0=Apache License version 2.0 +commons-beanutils--commons-beanutils--1.7.0=Apache License version 2.0 +commons-logging--commons-logging--1.0.3=Apache License version 2.0 +io.confluent--kafka-avro-serializer--1.0=Apache License version 2.0 +io.confluent--kafka-schema-registry-client--1.0=Apache License version 2.0 +org.apache.zookeeper--zookeeper--3.4.6=Apache License version 2.0 +org.codehaus.jettison--jettison--1.1=Apache License version 2.0 +org.codehaus.plexus--plexus-container-default--1.0-alpha-9-stable-1=Apache License version 2.0 +org.jdom--jdom--1.1=Apache License version 2.0 +oro--oro--2.0.8=Apache License version 2.0 + +asm--asm--3.1=BSD 3-Clause License +asm--asm-commons--3.1=BSD 3-Clause License +asm--asm-tree--3.1=BSD 3-Clause License + +javax.jms--jms--1.1=COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.1 +javax.servlet--jsp-api--2.0=COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.1 +javax.servlet--servlet-api--2.4=COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.1 +javax.servlet--servlet-api--2.5=COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.1 +javax.servlet.jsp--jsp-api--2.1=COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.1 +javax.transaction--jta--1.1=COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.1 +javax.transaction--transaction-api--1.1=COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.1 +org.glassfish.jersey--jersey-bom--2.27=COMMON DEVELOPMENT AND DISTRIB
[storm] branch master updated: STORM-3342: Add license-maven-plugin configuration, and describe how to use it
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git The following commit(s) were added to refs/heads/master by this push: new 499ba07 STORM-3342: Add license-maven-plugin configuration, and describe how to use it new 0eb14a5 Merge pull request #2959 from srdo/licenses 499ba07 is described below commit 499ba076a9e1b2f93eda03895d761d1ff0bc6332 Author: Stig Rohde Døssing AuthorDate: Sun May 27 00:06:15 2018 +0200 STORM-3342: Add license-maven-plugin configuration, and describe how to use it --- DEVELOPER.md | 7 +++- THIRD-PARTY.properties | 41 pom.xml| 87 ++ 3 files changed, 134 insertions(+), 1 deletion(-) diff --git a/DEVELOPER.md b/DEVELOPER.md index 07add43..6abd622 100644 --- a/DEVELOPER.md +++ b/DEVELOPER.md @@ -306,9 +306,14 @@ You can also run tests selectively with `-Dtest=`. This works for bo Unfortunately you might experience failures in clojure tests which are wrapped in the `maven-clojure-plugin` and thus doesn't provide too much useful output at first sight - you might end up with a maven test failure with an error message as unhelpful as `Clojure failed.`. In this case it's recommended to look into `target/test-reports` of the failed project to see what actual tests have failed or scroll through the maven output looking for obvious issues like missing binaries. -By default integration tests are not run in the test phase. To run Java and Clojure integration tests you must enable the profile +By default integration tests are not run in the test phase. To run Java and Clojure integration tests you must enable the profile `integration-tests-only`, or `all-tests`. +## Listing dependency licenses +You can generate a list of dependencies and their licenses by running `mvn generate-resources -Dlicense.skipAggregateAddThirdParty=false` in the project root. +The list will be put in target/generated-sources/license/THIRD-PARTY.txt. + +The license aggregation plugin will use the license listed in a dependency's POM. If the license is missing, or incomplete (e.g. due to multiple licenses), you can override the license by describing the dependency in the THIRD-PARTY.properties file in the project root. diff --git a/THIRD-PARTY.properties b/THIRD-PARTY.properties new file mode 100644 index 000..2f0f2af --- /dev/null +++ b/THIRD-PARTY.properties @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +classworlds--classworlds--1.1-alpha-2=Apache License version 2.0 +com.twitter--carbonite--1.5.0=Apache License version 2.0 +commons-beanutils--commons-beanutils--1.7.0=Apache License version 2.0 +commons-logging--commons-logging--1.0.3=Apache License version 2.0 +io.confluent--kafka-avro-serializer--1.0=Apache License version 2.0 +io.confluent--kafka-schema-registry-client--1.0=Apache License version 2.0 +org.apache.zookeeper--zookeeper--3.4.6=Apache License version 2.0 +org.codehaus.jettison--jettison--1.1=Apache License version 2.0 +org.codehaus.plexus--plexus-container-default--1.0-alpha-9-stable-1=Apache License version 2.0 +org.jdom--jdom--1.1=Apache License version 2.0 +oro--oro--2.0.8=Apache License version 2.0 + +asm--asm--3.1=BSD 3-Clause License +asm--asm-commons--3.1=BSD 3-Clause License +asm--asm-tree--3.1=BSD 3-Clause License + +javax.jms--jms--1.1=COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.1 +javax.servlet--jsp-api--2.0=COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.1 +javax.servlet--servlet-api--2.4=COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.1 +javax.servlet--servlet-api--2.5=COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.1 +javax.servlet.jsp--jsp-api--2.1=COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.1 +javax.transaction--jta--1.1=COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.1 +javax.transaction--transaction-api--1.1=COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.1 +org.glassfish.jersey--jersey-bom--2.27=COMMON DEVELOPMENT AND DISTRIB
[storm] branch master updated (e909b3d -> b3c0c8b)
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/storm.git. from e909b3d STORM-3330: More storm-webapp path cleanup new 4b1caa0 [MINOR] Update repository URLs from http to https new fe9c734 Merge branch 'MINOR-http-repo-to-https' new 7b0bc3b [MINOR] Update release verify script - 'sha' to 'sha512' new b3c0c8b Merge branch 'MINOR-rc-sha-to-sha512' The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: dev-tools/rc/verify-release-file.sh| 6 +++--- external/storm-blobstore-migration/pom.xml | 2 +- external/storm-hdfs/pom.xml| 2 +- external/storm-mqtt/pom.xml| 2 +- pom.xml| 6 +++--- 5 files changed, 9 insertions(+), 9 deletions(-)
[storm] 04/04: Merge branch 'MINOR-rc-sha-to-sha512'
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git commit b3c0c8b6f0bc02ae2100335ea35ab4b0a3888499 Merge: fe9c734 7b0bc3b Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Tue Feb 26 22:59:57 2019 +0900 Merge branch 'MINOR-rc-sha-to-sha512' dev-tools/rc/verify-release-file.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-)
[storm] 02/04: [MINOR] Update release verify script - 'sha' to 'sha512'
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git commit 7b0bc3b36e7d89e332d47956b747c3f77a6a21ad Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Mon Feb 25 22:54:34 2019 +0900 [MINOR] Update release verify script - 'sha' to 'sha512' --- dev-tools/rc/verify-release-file.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dev-tools/rc/verify-release-file.sh b/dev-tools/rc/verify-release-file.sh index dcb2202..2e33965 100755 --- a/dev-tools/rc/verify-release-file.sh +++ b/dev-tools/rc/verify-release-file.sh @@ -50,12 +50,12 @@ else fi # checking SHA -GPG_SHA_FILE="/tmp/${TARGET_FILE}_GPG.sha" +GPG_SHA_FILE="/tmp/${TARGET_FILE}_GPG.sha512" gpg --print-md SHA512 ${TARGET_FILE} > ${GPG_SHA_FILE} -SHA_TARGET_FILE="${TARGET_FILE}.sha" +SHA_TARGET_FILE="${TARGET_FILE}.sha512" echo ">> checking SHA file... (${SHA_TARGET_FILE})" -diff /tmp/${TARGET_FILE}_GPG.sha ${SHA_TARGET_FILE} +diff /tmp/${TARGET_FILE}_GPG.sha512 ${SHA_TARGET_FILE} if [ $? -eq 0 ]; then
[2/3] storm git commit: STORM-3292: log an error message if tick tuple interval is set in HiveOptions for Trident
STORM-3292: log an error message if tick tuple interval is set in HiveOptions for Trident Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b7a8d749 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b7a8d749 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b7a8d749 Branch: refs/heads/master Commit: b7a8d7492c9036fbf5e5c269f121265e68c65cd0 Parents: 07634fc Author: Arun Mahadevan Authored: Tue Nov 27 10:30:28 2018 -0800 Committer: Arun Mahadevan Committed: Tue Nov 27 11:00:36 2018 -0800 -- .../main/java/org/apache/storm/hive/common/HiveOptions.java | 2 +- .../java/org/apache/storm/hive/trident/HiveStateFactory.java | 7 +++ 2 files changed, 8 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/b7a8d749/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java -- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java index 4a91da1..db0fa69 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java @@ -19,7 +19,7 @@ public class HiveOptions implements Serializable { /** * Half of the default Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS */ -private static final int DEFAULT_TICK_TUPLE_INTERVAL_SECS = 15; +public static final int DEFAULT_TICK_TUPLE_INTERVAL_SECS = 15; protected HiveMapper mapper; protected String databaseName; http://git-wip-us.apache.org/repos/asf/storm/blob/b7a8d749/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java -- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java index 68722d1..e6a7d84 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java @@ -27,7 +27,14 @@ public class HiveStateFactory implements StateFactory { public HiveStateFactory() {} +/** + * The options for connecting to Hive. + */ public HiveStateFactory withOptions(HiveOptions options) { +if (options.getTickTupleInterval() != HiveOptions.DEFAULT_TICK_TUPLE_INTERVAL_SECS) { +LOG.error("Tick tuple interval will be ignored for trident." ++ " The Hive writers are flushed after each batch."); +} this.options = options; return this; }
[1/3] storm git commit: STORM-3292: flush writers in HiveState when the trident batch commits
Repository: storm Updated Branches: refs/heads/master 6d871a73e -> 94cd157c0 STORM-3292: flush writers in HiveState when the trident batch commits Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/07634fcb Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/07634fcb Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/07634fcb Branch: refs/heads/master Commit: 07634fcb881a53d548827f27b1a9388ff8982f5e Parents: 29eb449 Author: Arun Mahadevan Authored: Mon Nov 26 18:15:58 2018 -0800 Committer: Arun Mahadevan Committed: Mon Nov 26 18:17:55 2018 -0800 -- .../main/java/org/apache/storm/hive/trident/HiveState.java| 7 +++ 1 file changed, 7 insertions(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/07634fcb/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java -- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java index 6717329..b9494da 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java @@ -64,6 +64,13 @@ public class HiveState implements State { @Override public void commit(Long txId) { +try { +flushAllWriters(); +currentBatchSize = 0; +} catch (HiveWriter.TxnFailure | InterruptedException | HiveWriter.CommitFailure | HiveWriter.TxnBatchFailure ex) { +LOG.warn("Commit failed. Failing the batch.", ex); +throw new FailedException(ex); +} } public void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
[1/2] storm git commit: STORM-3292: flush writers in HiveState when the trident batch commits
Repository: storm Updated Branches: refs/heads/1.x-branch 8eac25b34 -> e6828f25f STORM-3292: flush writers in HiveState when the trident batch commits Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/954ebe2c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/954ebe2c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/954ebe2c Branch: refs/heads/1.x-branch Commit: 954ebe2c059eb0f21888224935fdfe3d7d653f4f Parents: 8eac25b Author: Arun Mahadevan Authored: Mon Nov 26 18:15:58 2018 -0800 Committer: Jungtaek Lim (HeartSaVioR) Committed: Sun Dec 2 15:53:58 2018 +0900 -- .../main/java/org/apache/storm/hive/trident/HiveState.java| 7 +++ 1 file changed, 7 insertions(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/954ebe2c/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java -- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java index 10b3591..a4cefbe 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java @@ -71,6 +71,13 @@ public class HiveState implements State { @Override public void commit(Long txId) { +try { +flushAllWriters(); +currentBatchSize = 0; +} catch (HiveWriter.TxnFailure | InterruptedException | HiveWriter.CommitFailure | HiveWriter.TxnBatchFailure ex) { +LOG.warn("Commit failed. Failing the batch.", ex); +throw new FailedException(ex); +} } public void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
[3/3] storm git commit: Merge branch 'STORM-3292' of https://github.com/arunmahadevan/storm into STORM-3292
Merge branch 'STORM-3292' of https://github.com/arunmahadevan/storm into STORM-3292 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/94cd157c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/94cd157c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/94cd157c Branch: refs/heads/master Commit: 94cd157c0f12ceb6538169ce4b4f6193f7f77eed Parents: 6d871a7 b7a8d74 Author: Jungtaek Lim (HeartSaVioR) Authored: Sun Dec 2 15:52:56 2018 +0900 Committer: Jungtaek Lim (HeartSaVioR) Committed: Sun Dec 2 15:52:56 2018 +0900 -- .../main/java/org/apache/storm/hive/common/HiveOptions.java | 2 +- .../main/java/org/apache/storm/hive/trident/HiveState.java| 7 +++ .../java/org/apache/storm/hive/trident/HiveStateFactory.java | 7 +++ 3 files changed, 15 insertions(+), 1 deletion(-) --
storm git commit: Retrigger sync with ASF git repo
Repository: storm Updated Branches: refs/heads/1.x-branch 590736405 -> 8eac25b34 Retrigger sync with ASF git repo Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8eac25b3 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8eac25b3 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8eac25b3 Branch: refs/heads/1.x-branch Commit: 8eac25b346f2911cf884e2d514c68f3dcbe94f69 Parents: 5907364 Author: Jungtaek Lim (HeartSaVioR) Authored: Mon Nov 19 11:55:27 2018 +0900 Committer: Jungtaek Lim (HeartSaVioR) Committed: Mon Nov 19 11:55:27 2018 +0900 -- --
[1/4] storm git commit: STORM-3123 - add support for Kafka security config in storm-kafka-monitor
Repository: storm Updated Branches: refs/heads/1.x-branch 2f3a04dcc -> 590736405 STORM-3123 - add support for Kafka security config in storm-kafka-monitor Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/371cc269 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/371cc269 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/371cc269 Branch: refs/heads/1.x-branch Commit: 371cc269a6dec5ee6172d7631f7f5574d6eab566 Parents: 2f3a04d Author: Vipin Rathor Authored: Wed Jul 11 17:01:36 2018 -0700 Committer: Arun Mahadevan Committed: Mon Nov 12 12:38:55 2018 -0800 -- .../storm/kafka/monitor/KafkaOffsetLagUtil.java | 15 --- .../kafka/monitor/NewKafkaSpoutOffsetQuery.java | 9 - 2 files changed, 20 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/371cc269/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java -- diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java index fa80da0..325d608 100644 --- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java +++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java @@ -30,6 +30,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; import org.json.simple.JSONValue; import java.util.ArrayList; @@ -71,6 +72,8 @@ public class KafkaOffsetLagUtil { private static final String OPTION_ZK_BROKERS_ROOT_LONG = "zk-brokers-root-node"; private static final String OPTION_SECURITY_PROTOCOL_SHORT = "s"; private static final String OPTION_SECURITY_PROTOCOL_LONG = "security-protocol"; +private static final String OPTION_CONSUMER_CONFIG_SHORT = "c"; +private static final String OPTION_CONSUMER_CONFIG_LONG = "consumer-config"; public static void main (String args[]) { try { @@ -136,10 +139,10 @@ public class KafkaOffsetLagUtil { " is not specified"); } NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery = new NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), - commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG), commandLine.getOptionValue(OPTION_GROUP_ID_LONG), securityProtocol); + commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG), commandLine.getOptionValue(OPTION_GROUP_ID_LONG), +securityProtocol, commandLine.getOptionValue(OPTION_CONSUMER_CONFIG_LONG)); results = getOffsetLags(newKafkaSpoutOffsetQuery); } - Map> keyedResult = keyByTopicAndPartition(results); System.out.print(JSONValue.toJSONString(keyedResult)); } catch (Exception ex) { @@ -195,6 +198,8 @@ public class KafkaOffsetLagUtil { options.addOption(OPTION_ZK_BROKERS_ROOT_SHORT, OPTION_ZK_BROKERS_ROOT_LONG, true, "Zk node prefix where kafka stores broker information e.g. " + "/brokers (applicable only for old kafka spout) "); options.addOption(OPTION_SECURITY_PROTOCOL_SHORT, OPTION_SECURITY_PROTOCOL_LONG, true, "Security protocol to connect to kafka"); +options.addOption(OPTION_CONSUMER_CONFIG_SHORT, OPTION_CONSUMER_CONFIG_LONG, true, "Security configuration file useful " + + "when connecting to secure kafka"); return options; } @@ -203,7 +208,7 @@ public class KafkaOffsetLagUtil { * @param newKafkaSpoutOffsetQuery represents the information needed to query kafka for log head and spout offsets * @return log head offset, spout offset and lag for each partition */ -public static List getOffsetLags (NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery) { +public static List getOffsetLags(NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery) throws Exception { KafkaConsumer consumer = null; List result = new ArrayList<>(); try { @@ -216,6 +221,10 @@ public class KafkaOffsetLagUtil { props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); if (newKafkaSpoutOffsetQuery.getSecurityProtocol() != null) { props.put("security.protocol", newKafkaSpoutOffsetQuery.getSecurityProtocol()); +
[3/4] storm git commit: STORM-3123: Address review comments
STORM-3123: Address review comments Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3e655cba Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3e655cba Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3e655cba Branch: refs/heads/1.x-branch Commit: 3e655cba0a621aae24a3300c2b75d1b15300032b Parents: bbe7827 Author: Arun Mahadevan Authored: Thu Nov 15 17:12:44 2018 -0800 Committer: Arun Mahadevan Committed: Thu Nov 15 17:20:00 2018 -0800 -- .../java/org/apache/storm/kafka/spout/KafkaSpout.java | 2 ++ .../apache/storm/kafka/monitor/KafkaOffsetLagUtil.java| 4 ++-- .../storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java | 10 +- .../src/jvm/org/apache/storm/utils/TopologySpoutLag.java | 4 ++-- 4 files changed, 11 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/3e655cba/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java -- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 94cb893..78b59ae 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -711,6 +711,8 @@ public class KafkaSpout extends BaseRichSpout { for (Entry conf: kafkaSpoutConfig.getKafkaProps().entrySet()) { if (conf.getValue() != null && isPrimitiveOrWrapper(conf.getValue().getClass())) { configuration.put(configKeyPrefix + conf.getKey(), conf.getValue()); +} else { +LOG.debug("Dropping Kafka prop '{}' from component configuration", conf.getKey()); } } return configuration; http://git-wip-us.apache.org/repos/asf/storm/blob/3e655cba/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java -- diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java index 6f29043..d8adcea 100644 --- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java +++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java @@ -223,8 +223,8 @@ public class KafkaOffsetLagUtil { props.put("security.protocol", newKafkaSpoutOffsetQuery.getSecurityProtocol()); } // Read property file for extra consumer properties -if (newKafkaSpoutOffsetQuery.getConsumerConfig() != null) { - props.putAll(Utils.loadProps(newKafkaSpoutOffsetQuery.getConsumerConfig())); +if (newKafkaSpoutOffsetQuery.getConsumerPropertiesFileName() != null) { + props.putAll(Utils.loadProps(newKafkaSpoutOffsetQuery.getConsumerPropertiesFileName())); } List topicPartitionList = new ArrayList<>(); consumer = new KafkaConsumer<>(props); http://git-wip-us.apache.org/repos/asf/storm/blob/3e655cba/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java -- diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java index c6cafaf..8d51856 100644 --- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java +++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java @@ -25,15 +25,15 @@ public class NewKafkaSpoutOffsetQuery { private final String consumerGroupId; // consumer group id for which the offset needs to be calculated private final String bootStrapBrokers; // bootstrap brokers private final String securityProtocol; // security protocol to connect to kafka -private final String consumerConfig; // security configuration file to connect to secure kafka +private final String consumerPropertiesFileName; // properties file containing additional kafka consumer configs public NewKafkaSpoutOffsetQuery(String topics, String bootstrapBrokers, String consumerGroupId, String securityProtocol, -String consumerConfig) { +String consumerPropertiesFileName) {
[4/4] storm git commit: Merge branch 'STORM-3123-1.x' of https://github.com/arunmahadevan/storm into STORM-3123-1.x
Merge branch 'STORM-3123-1.x' of https://github.com/arunmahadevan/storm into STORM-3123-1.x Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/59073640 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/59073640 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/59073640 Branch: refs/heads/1.x-branch Commit: 59073640554e31a098d6c08fd39aa3e910d9a4b3 Parents: 2f3a04d 3e655cb Author: Jungtaek Lim (HeartSaVioR) Authored: Mon Nov 19 10:53:07 2018 +0900 Committer: Jungtaek Lim (HeartSaVioR) Committed: Mon Nov 19 10:53:07 2018 +0900 -- .../apache/storm/kafka/spout/KafkaSpout.java| 22 ++- .../storm/kafka/monitor/KafkaOffsetLagUtil.java | 15 - .../kafka/monitor/NewKafkaSpoutOffsetQuery.java | 9 ++- .../apache/storm/utils/TopologySpoutLag.java| 60 +--- 4 files changed, 92 insertions(+), 14 deletions(-) --
[2/4] storm git commit: STORM-3123: Changes to return extra properties from KafkaSpout and use it in TopologySpoutLag
STORM-3123: Changes to return extra properties from KafkaSpout and use it in TopologySpoutLag Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bbe78279 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bbe78279 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bbe78279 Branch: refs/heads/1.x-branch Commit: bbe7827987dc03393652740522975b7bf0169c64 Parents: 371cc26 Author: Arun Mahadevan Authored: Mon Nov 12 11:19:31 2018 -0800 Committer: Arun Mahadevan Committed: Thu Nov 15 17:20:00 2018 -0800 -- .../apache/storm/kafka/spout/KafkaSpout.java| 20 ++- .../storm/kafka/monitor/KafkaOffsetLagUtil.java | 12 ++-- .../apache/storm/utils/TopologySpoutLag.java| 60 +--- 3 files changed, 76 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/bbe78279/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java -- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 9bb77b8..94cb893 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -708,11 +708,27 @@ public class KafkaSpout extends BaseRichSpout { configuration.put(configKeyPrefix + "topics", getTopicsString()); configuration.put(configKeyPrefix + "groupid", kafkaSpoutConfig.getConsumerGroupId()); -configuration.put(configKeyPrefix + "bootstrap.servers", kafkaSpoutConfig.getKafkaProps().get("bootstrap.servers")); -configuration.put(configKeyPrefix + "security.protocol", kafkaSpoutConfig.getKafkaProps().get("security.protocol")); +for (Entry conf: kafkaSpoutConfig.getKafkaProps().entrySet()) { +if (conf.getValue() != null && isPrimitiveOrWrapper(conf.getValue().getClass())) { +configuration.put(configKeyPrefix + conf.getKey(), conf.getValue()); +} +} return configuration; } +private boolean isPrimitiveOrWrapper(Class type) { +if (type == null) { +return false; +} +return type.isPrimitive() || isWrapper(type); +} + +private boolean isWrapper(Class type) { +return type == Double.class || type == Float.class || type == Long.class || +type == Integer.class || type == Short.class || type == Character.class || +type == Byte.class || type == Boolean.class || type == String.class; +} + private String getTopicsString() { return kafkaSpoutConfig.getSubscription().getTopicsString(); } http://git-wip-us.apache.org/repos/asf/storm/blob/bbe78279/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java -- diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java index 325d608..6f29043 100644 --- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java +++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java @@ -198,8 +198,8 @@ public class KafkaOffsetLagUtil { options.addOption(OPTION_ZK_BROKERS_ROOT_SHORT, OPTION_ZK_BROKERS_ROOT_LONG, true, "Zk node prefix where kafka stores broker information e.g. " + "/brokers (applicable only for old kafka spout) "); options.addOption(OPTION_SECURITY_PROTOCOL_SHORT, OPTION_SECURITY_PROTOCOL_LONG, true, "Security protocol to connect to kafka"); -options.addOption(OPTION_CONSUMER_CONFIG_SHORT, OPTION_CONSUMER_CONFIG_LONG, true, "Security configuration file useful " - + "when connecting to secure kafka"); +options.addOption(OPTION_CONSUMER_CONFIG_SHORT, OPTION_CONSUMER_CONFIG_LONG, true, "Properties file with additional " + +"Kafka consumer properties"); return options; } @@ -221,10 +221,10 @@ public class KafkaOffsetLagUtil { props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); if (newKafkaSpoutOffsetQuery.getSecurityProtocol() != null) { props.put("security.protocol", newKafkaSpoutOffsetQuery.getSecurityProtocol()); -// Read Kafka property file for extra security options -if
[1/4] storm git commit: STORM-3123 - add support for Kafka security config in storm-kafka-monitor
Repository: storm Updated Branches: refs/heads/master f17b3dad8 -> 29eb449ee STORM-3123 - add support for Kafka security config in storm-kafka-monitor Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/40e24ce4 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/40e24ce4 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/40e24ce4 Branch: refs/heads/master Commit: 40e24ce45a7744e2d24d4e8d3f6f146372c57824 Parents: 98ed0a8 Author: Vipin Rathor Authored: Wed Jul 11 17:01:36 2018 -0700 Committer: Arun Mahadevan Committed: Mon Nov 12 18:12:18 2018 -0800 -- .../storm/kafka/monitor/KafkaOffsetLagUtil.java | 14 -- .../storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java | 9 - 2 files changed, 20 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/40e24ce4/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java -- diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java index 78b6993..e31fad4 100644 --- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java +++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java @@ -33,6 +33,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; import org.json.simple.JSONValue; /** @@ -47,6 +48,8 @@ public class KafkaOffsetLagUtil { private static final String OPTION_GROUP_ID_LONG = "groupid"; private static final String OPTION_SECURITY_PROTOCOL_SHORT = "s"; private static final String OPTION_SECURITY_PROTOCOL_LONG = "security-protocol"; +private static final String OPTION_CONSUMER_CONFIG_SHORT = "c"; +private static final String OPTION_CONSUMER_CONFIG_LONG = "consumer-config"; public static void main(String args[]) { try { @@ -63,7 +66,8 @@ public class KafkaOffsetLagUtil { NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery = new NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG), -commandLine.getOptionValue(OPTION_GROUP_ID_LONG), securityProtocol); +commandLine.getOptionValue(OPTION_GROUP_ID_LONG), securityProtocol, +commandLine.getOptionValue(OPTION_CONSUMER_CONFIG_LONG)); List results = getOffsetLags(newKafkaSpoutOffsetQuery); Map> keyedResult = keyByTopicAndPartition(results); @@ -110,6 +114,8 @@ public class KafkaOffsetLagUtil { "consumer/spout e.g. hostname1:9092,hostname2:9092"); options.addOption(OPTION_GROUP_ID_SHORT, OPTION_GROUP_ID_LONG, true, "Group id of consumer"); options.addOption(OPTION_SECURITY_PROTOCOL_SHORT, OPTION_SECURITY_PROTOCOL_LONG, true, "Security protocol to connect to kafka"); +options.addOption(OPTION_CONSUMER_CONFIG_SHORT, OPTION_CONSUMER_CONFIG_LONG, true, "Security configuration file useful " + + "when connecting to secure kafka"); return options; } @@ -117,7 +123,7 @@ public class KafkaOffsetLagUtil { * @param newKafkaSpoutOffsetQuery represents the information needed to query kafka for log head and spout offsets * @return log head offset, spout offset and lag for each partition */ -public static List getOffsetLags(NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery) { +public static List getOffsetLags(NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery) throws Exception { KafkaConsumer consumer = null; List result = new ArrayList<>(); try { @@ -130,6 +136,10 @@ public class KafkaOffsetLagUtil { props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); if (newKafkaSpoutOffsetQuery.getSecurityProtocol() != null) { props.put("security.protocol", newKafkaSpoutOffsetQuery.getSecurityProtocol()); +// Read Kafka property file for extra security options +if (newKafkaSpoutOffsetQuery.getConsumerConfig() != null) { + props.putAll(Utils.loadProps(newKafkaSpoutOffsetQuery.getConsumerConfig())); +} } List topicPartitionList = new
[3/4] storm git commit: STORM-3123: Address review comments
STORM-3123: Address review comments Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d8d88374 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d8d88374 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d8d88374 Branch: refs/heads/master Commit: d8d88374d992a7f3b6455e26fececa2bf50d5165 Parents: fa0b862 Author: Arun Mahadevan Authored: Thu Nov 15 10:38:51 2018 -0800 Committer: Arun Mahadevan Committed: Thu Nov 15 10:38:58 2018 -0800 -- .../java/org/apache/storm/kafka/spout/KafkaSpout.java | 2 ++ .../apache/storm/kafka/monitor/KafkaOffsetLagUtil.java| 4 ++-- .../storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java | 10 +- .../src/jvm/org/apache/storm/utils/TopologySpoutLag.java | 4 ++-- 4 files changed, 11 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/d8d88374/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java -- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index cc8cb77..7d7a856 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -695,6 +695,8 @@ public class KafkaSpout extends BaseRichSpout { for (Entry conf: kafkaSpoutConfig.getKafkaProps().entrySet()) { if (conf.getValue() != null && isPrimitiveOrWrapper(conf.getValue().getClass())) { configuration.put(configKeyPrefix + conf.getKey(), conf.getValue()); +} else { +LOG.debug("Dropping Kafka prop '{}' from component configuration", conf.getKey()); } } return configuration; http://git-wip-us.apache.org/repos/asf/storm/blob/d8d88374/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java -- diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java index 1d53436..398ebae 100644 --- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java +++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java @@ -138,8 +138,8 @@ public class KafkaOffsetLagUtil { props.put("security.protocol", newKafkaSpoutOffsetQuery.getSecurityProtocol()); } // Read property file for extra consumer properties -if (newKafkaSpoutOffsetQuery.getConsumerConfig() != null) { - props.putAll(Utils.loadProps(newKafkaSpoutOffsetQuery.getConsumerConfig())); +if (newKafkaSpoutOffsetQuery.getConsumerPropertiesFileName() != null) { + props.putAll(Utils.loadProps(newKafkaSpoutOffsetQuery.getConsumerPropertiesFileName())); } List topicPartitionList = new ArrayList<>(); consumer = new KafkaConsumer<>(props); http://git-wip-us.apache.org/repos/asf/storm/blob/d8d88374/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java -- diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java index 53df461..0327ea0 100644 --- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java +++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java @@ -27,15 +27,15 @@ public class NewKafkaSpoutOffsetQuery { private final String consumerGroupId; // consumer group id for which the offset needs to be calculated private final String bootStrapBrokers; // bootstrap brokers private final String securityProtocol; // security protocol to connect to kafka -private final String consumerConfig; // security configuration file to connect to secure kafka +private final String consumerPropertiesFileName; // properties file containing additional kafka consumer configs public NewKafkaSpoutOffsetQuery(String topics, String bootstrapBrokers, String consumerGroupId, String securityProtocol, -String consumerConfig) { +String consumerPropertiesFileName) {
[4/4] storm git commit: Merge branch 'STORM-3123' of https://github.com/arunmahadevan/storm into STORM-3123
Merge branch 'STORM-3123' of https://github.com/arunmahadevan/storm into STORM-3123 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/29eb449e Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/29eb449e Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/29eb449e Branch: refs/heads/master Commit: 29eb449ee2518d4d2f750fa32b667b671912cbad Parents: f17b3da d8d8837 Author: Jungtaek Lim (HeartSaVioR) Authored: Fri Nov 16 08:14:47 2018 +0900 Committer: Jungtaek Lim (HeartSaVioR) Committed: Fri Nov 16 08:14:47 2018 +0900 -- .../apache/storm/kafka/spout/KafkaSpout.java| 23 +++- .../storm/kafka/monitor/KafkaOffsetLagUtil.java | 14 - .../kafka/monitor/NewKafkaSpoutOffsetQuery.java | 9 ++- .../apache/storm/utils/TopologySpoutLag.java| 60 +--- 4 files changed, 93 insertions(+), 13 deletions(-) --
[2/4] storm git commit: STORM-3123: Changes to return extra properties from KafkaSpout and use it in TopologySpoutLag
STORM-3123: Changes to return extra properties from KafkaSpout and use it in TopologySpoutLag Change-Id: Id6e3ce120cc813adbe611d085cd4bc3ebd0ff590 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fa0b8624 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fa0b8624 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fa0b8624 Branch: refs/heads/master Commit: fa0b8624492630d0f5c3c89e6bf7a5d20ac59d8b Parents: 40e24ce Author: Arun Mahadevan Authored: Mon Nov 12 11:19:31 2018 -0800 Committer: Arun Mahadevan Committed: Tue Nov 13 00:38:50 2018 -0800 -- .../apache/storm/kafka/spout/KafkaSpout.java| 21 ++- .../storm/kafka/monitor/KafkaOffsetLagUtil.java | 12 ++-- .../apache/storm/utils/TopologySpoutLag.java| 60 +--- 3 files changed, 77 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/fa0b8624/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java -- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 1ee0a5c..cc8cb77 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -692,11 +692,28 @@ public class KafkaSpout extends BaseRichSpout { configuration.put(configKeyPrefix + "topics", getTopicsString()); configuration.put(configKeyPrefix + "groupid", kafkaSpoutConfig.getConsumerGroupId()); -configuration.put(configKeyPrefix + "bootstrap.servers", kafkaSpoutConfig.getKafkaProps().get("bootstrap.servers")); -configuration.put(configKeyPrefix + "security.protocol", kafkaSpoutConfig.getKafkaProps().get("security.protocol")); +for (Entry conf: kafkaSpoutConfig.getKafkaProps().entrySet()) { +if (conf.getValue() != null && isPrimitiveOrWrapper(conf.getValue().getClass())) { +configuration.put(configKeyPrefix + conf.getKey(), conf.getValue()); +} +} return configuration; } +private boolean isPrimitiveOrWrapper(Class type) { +if (type == null) { +return false; +} +return type.isPrimitive() || isWrapper(type); +} + +private boolean isWrapper(Class type) { +return type == Double.class || type == Float.class || type == Long.class +|| type == Integer.class || type == Short.class || type == Character.class +|| type == Byte.class || type == Boolean.class || type == String.class; +} + + private String getTopicsString() { return kafkaSpoutConfig.getTopicFilter().getTopicsString(); } http://git-wip-us.apache.org/repos/asf/storm/blob/fa0b8624/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java -- diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java index e31fad4..1d53436 100644 --- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java +++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java @@ -114,8 +114,8 @@ public class KafkaOffsetLagUtil { "consumer/spout e.g. hostname1:9092,hostname2:9092"); options.addOption(OPTION_GROUP_ID_SHORT, OPTION_GROUP_ID_LONG, true, "Group id of consumer"); options.addOption(OPTION_SECURITY_PROTOCOL_SHORT, OPTION_SECURITY_PROTOCOL_LONG, true, "Security protocol to connect to kafka"); -options.addOption(OPTION_CONSUMER_CONFIG_SHORT, OPTION_CONSUMER_CONFIG_LONG, true, "Security configuration file useful " - + "when connecting to secure kafka"); +options.addOption(OPTION_CONSUMER_CONFIG_SHORT, OPTION_CONSUMER_CONFIG_LONG, true, "Properties file with additional " + +"Kafka consumer properties"); return options; } @@ -136,10 +136,10 @@ public class KafkaOffsetLagUtil { props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); if (newKafkaSpoutOffsetQuery.getSecurityProtocol() != null) { props.put("security.protocol", newKafkaSpoutOffsetQuery.getSecurityProtocol()); -// Read Kafka property file for extra security options -if
[1/2] storm git commit: STORM-3200 improve keytab error message
Repository: storm Updated Branches: refs/heads/master 518f94703 -> 2f399579d STORM-3200 improve keytab error message Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/04f75bfa Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/04f75bfa Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/04f75bfa Branch: refs/heads/master Commit: 04f75bfad131ac2be4450433231814f3eba25717 Parents: 518f947 Author: Aaron Gresch Authored: Tue Aug 21 14:32:16 2018 -0500 Committer: Aaron Gresch Committed: Tue Aug 21 14:32:16 2018 -0500 -- .../main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/04f75bfa/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java -- diff --git a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java index 2627231..e6608c5 100644 --- a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java +++ b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java @@ -162,7 +162,7 @@ public class HdfsBlobStore extends BlobStore { localSubject = getHadoopUser(); } } catch (IOException e) { -throw new RuntimeException("Error logging in from keytab!", e); +throw new RuntimeException("Error logging in from keytab: " + e.getMessage(), e); } aclHandler = new BlobStoreAclHandler(conf); Path baseDir = new Path(overrideBase, BASE_BLOBS_DIR_NAME);
[2/2] storm git commit: Merge branch 'agresch_blobstore_debug' of https://github.com/agresch/storm into STORM-3200-merge
Merge branch 'agresch_blobstore_debug' of https://github.com/agresch/storm into STORM-3200-merge Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2f399579 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2f399579 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2f399579 Branch: refs/heads/master Commit: 2f399579d2ab990946b0b7a5b36c9b0340019c82 Parents: 518f947 04f75bf Author: Jungtaek Lim Authored: Thu Aug 23 22:40:32 2018 +0900 Committer: Jungtaek Lim Committed: Thu Aug 23 22:40:32 2018 +0900 -- .../main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) --
[1/2] storm git commit: STORM-3184: Replace the usage of redact-value with ConfigUtils.maskPasswords
Repository: storm Updated Branches: refs/heads/1.x-branch ea84f47e1 -> b8f2039d6 STORM-3184: Replace the usage of redact-value with ConfigUtils.maskPasswords The topology submission can fail since redact-value expects a clojure map. We dont need redact-value, it can be replaced with just ConfigUtils.maskPasswords. This is already done in master. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/43faecc8 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/43faecc8 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/43faecc8 Branch: refs/heads/1.x-branch Commit: 43faecc870468a00d5f1b2d5ba1c5c4274ae77b0 Parents: fc942ee Author: Arun Mahadevan Authored: Mon Aug 20 16:31:32 2018 -0700 Committer: Arun Mahadevan Committed: Tue Aug 21 15:36:18 2018 -0700 -- storm-core/src/clj/org/apache/storm/daemon/nimbus.clj | 2 +- storm-core/src/clj/org/apache/storm/daemon/worker.clj | 2 +- storm-core/src/jvm/org/apache/storm/Config.java | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/43faecc8/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj -- diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index fc89ac4..e89fc9e 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -1748,7 +1748,7 @@ " (storm-" (.get_storm_version topology) " JDK-" (.get_jdk_version topology) ") with conf " - (redact-value (ConfigUtils/maskPasswords storm-conf) STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)) + (ConfigUtils/maskPasswords storm-conf)) ;; lock protects against multiple topologies being submitted at once and ;; cleanup thread killing topology in b/w assignment and starting the topology (locking (:submit-lock nimbus) http://git-wip-us.apache.org/repos/asf/storm/blob/43faecc8/storm-core/src/clj/org/apache/storm/daemon/worker.clj -- diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj index 13daa10..52b4af1 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj @@ -778,7 +778,7 @@ (schedule-recurring (:reset-log-levels-timer worker) 0 (conf WORKER-LOG-LEVEL-RESET-POLL-SECS) (fn [] (reset-log-levels latest-log-config))) (schedule-recurring (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) (partial refresh-storm-active worker)) -(log-message "Worker has topology config " (redact-value (ConfigUtils/maskPasswords (:storm-conf worker)) STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)) +(log-message "Worker has topology config " (ConfigUtils/maskPasswords (:storm-conf worker))) (log-message "Worker " worker-id " for storm " storm-id " on " assignment-id ":" port " has finished loading") ret )) http://git-wip-us.apache.org/repos/asf/storm/blob/43faecc8/storm-core/src/jvm/org/apache/storm/Config.java -- diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java index fc9fb55..4849b97 100644 --- a/storm-core/src/jvm/org/apache/storm/Config.java +++ b/storm-core/src/jvm/org/apache/storm/Config.java @@ -412,6 +412,7 @@ public class Config extends HashMap { * A string representing the payload for topology Zookeeper authentication. It gets serialized using UTF-8 encoding during authentication. */ @isString +@Password public static final String STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD="storm.zookeeper.topology.auth.payload"; /*
[2/2] storm git commit: Merge branch 'STORM-3184-followup' of https://github.com/arunmahadevan/storm into STORM-3184-follow-up-1.x
Merge branch 'STORM-3184-followup' of https://github.com/arunmahadevan/storm into STORM-3184-follow-up-1.x Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b8f2039d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b8f2039d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b8f2039d Branch: refs/heads/1.x-branch Commit: b8f2039d64cea990917540f1e0f4a3dacca14db0 Parents: ea84f47 43faecc Author: Jungtaek Lim Authored: Thu Aug 23 22:37:05 2018 +0900 Committer: Jungtaek Lim Committed: Thu Aug 23 22:37:05 2018 +0900 -- storm-core/src/clj/org/apache/storm/daemon/nimbus.clj | 2 +- storm-core/src/clj/org/apache/storm/daemon/worker.clj | 2 +- storm-core/src/jvm/org/apache/storm/Config.java | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) --
[2/2] storm git commit: Merge branch 'storm2578' of https://github.com/milantracy/storm into STORM-2578-merge
Merge branch 'storm2578' of https://github.com/milantracy/storm into STORM-2578-merge Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/518f9470 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/518f9470 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/518f9470 Branch: refs/heads/master Commit: 518f9470312170525fb50cc614a157547514b9b5 Parents: 2875c83 8bbee40 Author: Jungtaek Lim Authored: Mon Aug 20 14:16:03 2018 +0900 Committer: Jungtaek Lim Committed: Mon Aug 20 14:16:03 2018 +0900 -- external/storm-elasticsearch/pom.xml| 2 +- .../DefaultEsLookupResultOutput.java| 11 +++--- .../elasticsearch/EsLookupResultOutput.java | 8 +++- .../elasticsearch/bolt/AbstractEsBolt.java | 7 --- .../storm/elasticsearch/bolt/EsIndexBolt.java | 5 +++-- .../storm/elasticsearch/bolt/EsLookupBolt.java | 12 ++- .../elasticsearch/bolt/EsPercolateBolt.java | 8 +--- .../common/DefaultEsTupleMapper.java| 2 +- .../storm/elasticsearch/common/EsConfig.java| 8 +--- .../elasticsearch/common/EsTupleMapper.java | 5 +++-- .../common/StormElasticSearchClient.java| 9 + .../apache/storm/elasticsearch/doc/Index.java | 8 .../storm/elasticsearch/doc/IndexDoc.java | 1 + .../storm/elasticsearch/doc/IndexItem.java | 1 + .../storm/elasticsearch/doc/IndexItemDoc.java | 1 + .../apache/storm/elasticsearch/doc/Shards.java | 1 + .../storm/elasticsearch/doc/SourceDoc.java | 1 + .../response/BulkIndexResponse.java | 11 ++ .../elasticsearch/response/LookupResponse.java | 5 +++-- .../response/PercolateResponse.java | 5 +++-- .../storm/elasticsearch/trident/EsState.java| 21 .../elasticsearch/trident/EsStateFactory.java | 4 +++- .../storm/elasticsearch/trident/EsUpdater.java | 5 +++-- 23 files changed, 97 insertions(+), 44 deletions(-) --
[1/2] storm git commit: STORM-2578: Apply new code style to storm-elasticsearch
Repository: storm Updated Branches: refs/heads/master 2875c838a -> 518f94703 STORM-2578: Apply new code style to storm-elasticsearch Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8bbee40c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8bbee40c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8bbee40c Branch: refs/heads/master Commit: 8bbee40c3c2426aea9028290296cd758c3a3b567 Parents: 88d19b0 Author: Jing Chen Authored: Wed Aug 15 13:34:57 2018 -0700 Committer: Jing Chen Committed: Fri Aug 17 18:41:57 2018 -0700 -- external/storm-elasticsearch/pom.xml| 2 +- .../DefaultEsLookupResultOutput.java| 11 +++--- .../elasticsearch/EsLookupResultOutput.java | 8 +++- .../elasticsearch/bolt/AbstractEsBolt.java | 7 --- .../storm/elasticsearch/bolt/EsIndexBolt.java | 5 +++-- .../storm/elasticsearch/bolt/EsLookupBolt.java | 12 ++- .../elasticsearch/bolt/EsPercolateBolt.java | 8 +--- .../common/DefaultEsTupleMapper.java| 2 +- .../storm/elasticsearch/common/EsConfig.java| 8 +--- .../elasticsearch/common/EsTupleMapper.java | 5 +++-- .../common/StormElasticSearchClient.java| 9 + .../apache/storm/elasticsearch/doc/Index.java | 8 .../storm/elasticsearch/doc/IndexDoc.java | 1 + .../storm/elasticsearch/doc/IndexItem.java | 1 + .../storm/elasticsearch/doc/IndexItemDoc.java | 1 + .../apache/storm/elasticsearch/doc/Shards.java | 1 + .../storm/elasticsearch/doc/SourceDoc.java | 1 + .../response/BulkIndexResponse.java | 11 ++ .../elasticsearch/response/LookupResponse.java | 5 +++-- .../response/PercolateResponse.java | 5 +++-- .../storm/elasticsearch/trident/EsState.java| 21 .../elasticsearch/trident/EsStateFactory.java | 4 +++- .../storm/elasticsearch/trident/EsUpdater.java | 5 +++-- 23 files changed, 97 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/8bbee40c/external/storm-elasticsearch/pom.xml -- diff --git a/external/storm-elasticsearch/pom.xml b/external/storm-elasticsearch/pom.xml index 3386fbd..2394bbb 100644 --- a/external/storm-elasticsearch/pom.xml +++ b/external/storm-elasticsearch/pom.xml @@ -139,7 +139,7 @@ maven-checkstyle-plugin -69 +0 http://git-wip-us.apache.org/repos/asf/storm/blob/8bbee40c/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/DefaultEsLookupResultOutput.java -- diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/DefaultEsLookupResultOutput.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/DefaultEsLookupResultOutput.java index 533dc65..d25bead 100644 --- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/DefaultEsLookupResultOutput.java +++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/DefaultEsLookupResultOutput.java @@ -15,8 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.elasticsearch; +import com.fasterxml.jackson.databind.ObjectMapper; + import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -26,8 +29,6 @@ import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.elasticsearch.client.Response; -import com.fasterxml.jackson.databind.ObjectMapper; - /** * Default implementation of {@link EsLookupResultOutput}. * Outputs the index, type, id and source as strings. @@ -51,7 +52,11 @@ public class DefaultEsLookupResultOutput implements EsLookupResultOutput { } catch (UnsupportedOperationException | IOException e) { throw new IllegalArgumentException("Response " + response + " is invalid", e); } -return Collections.singleton(new Values(lookupResponse.getIndex(), lookupResponse.getType(), lookupResponse.getId(), lookupResponse.getSource())); +return Collections.singleton(new Values( +lookupResponse.getIndex(), +lookupResponse.getType(), +lookupResponse.getId(), +lookupResponse.getSource())); } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/8bbee40c/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/EsLookupResultOutput.java
[2/2] storm git commit: Merge branch 'STORM-3196' of https://github.com/jacobtolar/storm into STORM-3196-merge
Merge branch 'STORM-3196' of https://github.com/jacobtolar/storm into STORM-3196-merge Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2875c838 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2875c838 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2875c838 Branch: refs/heads/master Commit: 2875c838a9577ca81e119de6601a0f5d0184b663 Parents: 88d19b0 5aff465 Author: Jungtaek Lim Authored: Mon Aug 20 14:12:26 2018 +0900 Committer: Jungtaek Lim Committed: Mon Aug 20 14:12:26 2018 +0900 -- .../src/jvm/org/apache/storm/command/ListTopologies.java | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) --
[1/2] storm git commit: [STORM-3196] Add "owner" column to "storm list" output
Repository: storm Updated Branches: refs/heads/master 88d19b098 -> 2875c838a [STORM-3196] Add "owner" column to "storm list" output Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5aff4658 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5aff4658 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5aff4658 Branch: refs/heads/master Commit: 5aff46584b8ee22fa8e80ef47f9cc385e5f11e14 Parents: 4c42ee3 Author: Jacob Tolar Authored: Wed Aug 15 16:46:10 2018 -0500 Committer: Jacob Tolar Committed: Wed Aug 15 16:46:10 2018 -0500 -- .../src/jvm/org/apache/storm/command/ListTopologies.java | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/5aff4658/storm-core/src/jvm/org/apache/storm/command/ListTopologies.java -- diff --git a/storm-core/src/jvm/org/apache/storm/command/ListTopologies.java b/storm-core/src/jvm/org/apache/storm/command/ListTopologies.java index 4fe2982..13d9691 100644 --- a/storm-core/src/jvm/org/apache/storm/command/ListTopologies.java +++ b/storm-core/src/jvm/org/apache/storm/command/ListTopologies.java @@ -21,7 +21,7 @@ import org.slf4j.LoggerFactory; public class ListTopologies { private static final Logger LOG = LoggerFactory.getLogger(ListTopologies.class); -private static final String MSG_FORMAT = "%-20s %-10s %-10s %-12s %-12s %-20s\n"; +private static final String MSG_FORMAT = "%-20s %-10s %-10s %-12s %-12s %-20s %-20s\n"; public static void main(String[] args) throws Exception { NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() { @@ -31,12 +31,13 @@ public class ListTopologies { if (topologies == null || topologies.isEmpty()) { System.out.println("No topologies running."); } else { -System.out.printf(MSG_FORMAT, "Topology_name", "Status", "Num_tasks", "Num_workers", "Uptime_secs", "Topology_Id"); +System.out.printf(MSG_FORMAT, "Topology_name", "Status", "Num_tasks", "Num_workers", "Uptime_secs", "Topology_Id", "Owner"); System.out.println(""); for (TopologySummary topology : topologies) { System.out.printf(MSG_FORMAT, topology.get_name(), topology.get_status(), topology.get_num_tasks(), topology.get_num_workers(), - topology.get_uptime_secs(), topology.get_id()); + topology.get_uptime_secs(), topology.get_id(), + topology.get_owner()); } } }
[1/5] storm git commit: STORM-3184: Mask the plaintext passwords from the logs
Repository: storm Updated Branches: refs/heads/master c2931dab7 -> 4c42ee3d2 STORM-3184: Mask the plaintext passwords from the logs Introduce a `Password` config annotation and use it to mark configs that are sensitive and mask the values while logging. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9d36724e Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9d36724e Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9d36724e Branch: refs/heads/master Commit: 9d36724e88f54ae48fd892b001eeb76313adc9da Parents: 154173a Author: Arun Mahadevan Authored: Fri Aug 10 17:39:49 2018 -0700 Committer: Arun Mahadevan Committed: Fri Aug 10 17:41:19 2018 -0700 -- .../AbstractHadoopNimbusPluginAutoCreds.java| 4 ++- storm-client/pom.xml| 6 .../org/apache/storm/daemon/worker/Worker.java | 5 +-- .../jvm/org/apache/storm/utils/ConfigUtils.java | 37 .../storm/validation/ConfigValidation.java | 2 +- .../validation/ConfigValidationAnnotations.java | 6 .../java/org/apache/storm/DaemonConfig.java | 10 ++ .../org/apache/storm/daemon/nimbus/Nimbus.java | 4 +-- .../storm/daemon/supervisor/Supervisor.java | 2 +- .../java/org/apache/storm/DaemonConfigTest.java | 12 +++ 10 files changed, 81 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/9d36724e/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractHadoopNimbusPluginAutoCreds.java -- diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractHadoopNimbusPluginAutoCreds.java b/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractHadoopNimbusPluginAutoCreds.java index dee337e..0ddf381 100644 --- a/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractHadoopNimbusPluginAutoCreds.java +++ b/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractHadoopNimbusPluginAutoCreds.java @@ -25,6 +25,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.storm.security.INimbusCredentialPlugin; import org.apache.storm.security.auth.ICredentialsRenewer; +import org.apache.storm.utils.ConfigUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,7 +81,8 @@ public abstract class AbstractHadoopNimbusPluginAutoCreds protected void fillHadoopConfiguration(Map topologyConf, String configKey, Configuration configuration) { Map config = (Map) topologyConf.get(configKey); -LOG.info("TopoConf {}, got config {}, for configKey {}", topologyConf, config, configKey); +LOG.info("TopoConf {}, got config {}, for configKey {}", ConfigUtils.maskPasswords(topologyConf), +ConfigUtils.maskPasswords(config), configKey); if (config != null) { List resourcesToLoad = new ArrayList<>(); for (Map.Entry entry : config.entrySet()) { http://git-wip-us.apache.org/repos/asf/storm/blob/9d36724e/storm-client/pom.xml -- diff --git a/storm-client/pom.xml b/storm-client/pom.xml index 7c28cce..c925ffd 100644 --- a/storm-client/pom.xml +++ b/storm-client/pom.xml @@ -97,6 +97,12 @@ + +com.google.guava +guava +${guava.version} + + org.mockito http://git-wip-us.apache.org/repos/asf/storm/blob/9d36724e/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java -- diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java index 233dfac..9f8428b 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java @@ -132,7 +132,7 @@ public class Worker implements Shutdownable, DaemonCommon { public void start() throws Exception { LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", topologyId, assignmentId, port, workerId, - conf); + ConfigUtils.maskPasswords(conf)); // because in local mode, its not a separate // process. supervisor will register it in this case // if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode. @@ -278,7 +278,8 @@ public class Worker implements Shutdownable, DaemonCommon { setupFlushTupleTimer(topologyConf, newExecutors); setupBackPressureCheckTimer(topologyConf); -
[2/2] storm git commit: Merge branch 'STORM-3184' of https://github.com/arunmahadevan/storm into STORM-3184-1.x-merge
Merge branch 'STORM-3184' of https://github.com/arunmahadevan/storm into STORM-3184-1.x-merge Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4f8b0e39 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4f8b0e39 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4f8b0e39 Branch: refs/heads/1.x-branch Commit: 4f8b0e39baf12b7e6993d6785822a2a01745a9cb Parents: 7c4ae00 fc942ee Author: Jungtaek Lim Authored: Tue Aug 14 23:24:39 2018 +0900 Committer: Jungtaek Lim Committed: Tue Aug 14 23:24:39 2018 +0900 -- .../apache/storm/common/AbstractAutoCreds.java | 4 ++- .../src/clj/org/apache/storm/daemon/nimbus.clj | 6 ++-- .../src/clj/org/apache/storm/daemon/worker.clj | 6 ++-- storm-core/src/jvm/org/apache/storm/Config.java | 9 ++ .../storm/daemon/supervisor/Supervisor.java | 2 +- .../jvm/org/apache/storm/utils/ConfigUtils.java | 31 .../validation/ConfigValidationAnnotations.java | 7 + .../org/apache/storm/utils/ConfigUtilsTest.java | 12 8 files changed, 69 insertions(+), 8 deletions(-) --
[2/5] storm git commit: STORM-3184: remove guava dependency
STORM-3184: remove guava dependency Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0fe39c88 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0fe39c88 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0fe39c88 Branch: refs/heads/master Commit: 0fe39c8844f1d7e508b4f314942e26ee66bb503b Parents: 9d36724 Author: Arun Mahadevan Authored: Sat Aug 11 09:41:43 2018 -0700 Committer: Arun Mahadevan Committed: Sat Aug 11 10:28:55 2018 -0700 -- storm-client/pom.xml| 6 .../org/apache/storm/daemon/worker/Worker.java | 4 +-- .../jvm/org/apache/storm/utils/ConfigUtils.java | 38 +++- .../org/apache/storm/daemon/nimbus/Nimbus.java | 2 +- .../java/org/apache/storm/DaemonConfigTest.java | 2 +- 5 files changed, 33 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/0fe39c88/storm-client/pom.xml -- diff --git a/storm-client/pom.xml b/storm-client/pom.xml index c925ffd..7c28cce 100644 --- a/storm-client/pom.xml +++ b/storm-client/pom.xml @@ -97,12 +97,6 @@ - -com.google.guava -guava -${guava.version} - - org.mockito http://git-wip-us.apache.org/repos/asf/storm/blob/0fe39c88/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java -- diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java index 9f8428b..c0dfc0a 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java @@ -278,8 +278,8 @@ public class Worker implements Shutdownable, DaemonCommon { setupFlushTupleTimer(topologyConf, newExecutors); setupBackPressureCheckTimer(topologyConf); -LOG.info("Worker has topology config {}", Utils.redactValue(ConfigUtils.maskPasswords(topologyConf), -Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD)); +LOG.info("Worker has topology config {}", ConfigUtils.maskPasswords(Utils.redactValue(topologyConf, +Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD))); LOG.info("Worker {} for storm {} on {}:{} has finished loading", workerId, topologyId, assignmentId, port); return this; } http://git-wip-us.apache.org/repos/asf/storm/blob/0fe39c88/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java -- diff --git a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java index 5fefcea..064365d 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java +++ b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java @@ -26,9 +26,9 @@ import java.util.Map; import java.util.Random; import java.util.Set; import java.util.function.BooleanSupplier; +import java.util.function.Supplier; import java.util.stream.Collectors; -import com.google.common.collect.Maps; import org.apache.storm.Config; import org.apache.storm.daemon.supervisor.AdvancedFSOps; import org.apache.storm.generated.StormTopology; @@ -79,14 +79,34 @@ public class ConfigUtils { return oldInstance; } -public static Map maskPasswords(final Map conf) { -Maps.EntryTransformer maskPasswords = -new Maps.EntryTransformer() { -public Object transformEntry(String key, Object value) { -return passwordConfigKeys.contains(key) ? "*" : value; -} -}; -return Maps.transformEntries(conf, maskPasswords); +private static class MaskedConf implements Supplier> { +private final Map conf; + +MaskedConf(Map conf) { +this.conf = conf; +} + +@Override +public Map get() { +Map res = new HashMap<>(); +for (Map.Entry e : conf.entrySet()) { +if (passwordConfigKeys.contains(e.getKey())) { +res.put(e.getKey(), "*"); +} else { +res.put(e.getKey(), e.getValue()); +} +} +return res; +} + +@Override +public String toString() { +return get().toString(); +} +} + +public static Supplier> maskPasswords(Map conf) { +return new MaskedConf(conf); } public static boolean isLocalMode(Map conf) {
[1/2] storm git commit: STORM-3184: Mask the plaintext passwords from the logs
Repository: storm Updated Branches: refs/heads/1.x-branch 7c4ae00ad -> 4f8b0e39b STORM-3184: Mask the plaintext passwords from the logs Introduce a `Password` config annotation and use it to mark configs that are sensitive and mask the values while logging. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fc942ee1 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fc942ee1 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fc942ee1 Branch: refs/heads/1.x-branch Commit: fc942ee10d86649db9e9b8ce3dc0a04ea23439ce Parents: 7c4ae00 Author: Arun Mahadevan Authored: Tue Aug 7 12:13:54 2018 -0700 Committer: Arun Mahadevan Committed: Wed Aug 8 11:52:02 2018 -0700 -- .../apache/storm/common/AbstractAutoCreds.java | 4 ++- .../src/clj/org/apache/storm/daemon/nimbus.clj | 6 ++-- .../src/clj/org/apache/storm/daemon/worker.clj | 6 ++-- storm-core/src/jvm/org/apache/storm/Config.java | 9 ++ .../storm/daemon/supervisor/Supervisor.java | 2 +- .../jvm/org/apache/storm/utils/ConfigUtils.java | 31 .../validation/ConfigValidationAnnotations.java | 7 + .../org/apache/storm/utils/ConfigUtilsTest.java | 12 8 files changed, 69 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/fc942ee1/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java -- diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java b/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java index 7b2fc2d..2ce99aa 100644 --- a/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java +++ b/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java @@ -28,6 +28,7 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.storm.security.INimbusCredentialPlugin; import org.apache.storm.security.auth.IAutoCredentials; import org.apache.storm.security.auth.ICredentialsRenewer; +import org.apache.storm.utils.ConfigUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -149,7 +150,8 @@ public abstract class AbstractAutoCreds implements IAutoCredentials, ICredential protected void fillHadoopConfiguration(Map topoConf, String configKey, Configuration configuration) { Map config = (Map) topoConf.get(configKey); -LOG.info("TopoConf {}, got config {}, for configKey {}", topoConf, config, configKey); +LOG.info("TopoConf {}, got config {}, for configKey {}", ConfigUtils.maskPasswords(topoConf), +ConfigUtils.maskPasswords(config), configKey); if (config != null) { List resourcesToLoad = new ArrayList<>(); for (Map.Entry entry : config.entrySet()) { http://git-wip-us.apache.org/repos/asf/storm/blob/fc942ee1/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj -- diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index 850867e..fc89ac4 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -61,7 +61,7 @@ (:use [org.apache.storm.daemon common]) (:use [org.apache.storm config]) (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms]) - (:import [org.apache.storm.utils VersionInfo Time] + (:import [org.apache.storm.utils VersionInfo Time ConfigUtils] (org.apache.storm.metric ClusterMetricsConsumerExecutor) (org.apache.storm.metric.api IClusterMetricsConsumer$ClusterInfo DataPoint IClusterMetricsConsumer$SupervisorInfo) (org.apache.storm Config) @@ -1748,7 +1748,7 @@ " (storm-" (.get_storm_version topology) " JDK-" (.get_jdk_version topology) ") with conf " - (redact-value storm-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)) + (redact-value (ConfigUtils/maskPasswords storm-conf) STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)) ;; lock protects against multiple topologies being submitted at once and ;; cleanup thread killing topology in b/w assignment and starting the topology (locking (:submit-lock nimbus) @@ -2463,7 +2463,7 @@ (defserverfn service-handler [conf inimbus] (.prepare inimbus conf (master-inimbus-dir conf)) - (log-message "Starting Nimbus with conf " conf) + (log-message "Starting Nimbus with conf " (ConfigUtils/maskPasswords conf)) (let [nimbus (nimbus-data conf inimbus)
[3/5] storm git commit: Revert "STORM-3184: remove guava dependency"
Revert "STORM-3184: remove guava dependency" This reverts commit 0fe39c8844f1d7e508b4f314942e26ee66bb503b. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3358effa Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3358effa Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3358effa Branch: refs/heads/master Commit: 3358effa636d7e405971f3dc0ab19ed9ec8ae76d Parents: 0fe39c8 Author: Arun Mahadevan Authored: Sat Aug 11 20:58:49 2018 -0700 Committer: Arun Mahadevan Committed: Sat Aug 11 20:58:49 2018 -0700 -- storm-client/pom.xml| 6 .../org/apache/storm/daemon/worker/Worker.java | 4 +-- .../jvm/org/apache/storm/utils/ConfigUtils.java | 38 +--- .../org/apache/storm/daemon/nimbus/Nimbus.java | 2 +- .../java/org/apache/storm/DaemonConfigTest.java | 2 +- 5 files changed, 19 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/3358effa/storm-client/pom.xml -- diff --git a/storm-client/pom.xml b/storm-client/pom.xml index 7c28cce..c925ffd 100644 --- a/storm-client/pom.xml +++ b/storm-client/pom.xml @@ -97,6 +97,12 @@ + +com.google.guava +guava +${guava.version} + + org.mockito http://git-wip-us.apache.org/repos/asf/storm/blob/3358effa/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java -- diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java index c0dfc0a..9f8428b 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java @@ -278,8 +278,8 @@ public class Worker implements Shutdownable, DaemonCommon { setupFlushTupleTimer(topologyConf, newExecutors); setupBackPressureCheckTimer(topologyConf); -LOG.info("Worker has topology config {}", ConfigUtils.maskPasswords(Utils.redactValue(topologyConf, -Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD))); +LOG.info("Worker has topology config {}", Utils.redactValue(ConfigUtils.maskPasswords(topologyConf), +Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD)); LOG.info("Worker {} for storm {} on {}:{} has finished loading", workerId, topologyId, assignmentId, port); return this; } http://git-wip-us.apache.org/repos/asf/storm/blob/3358effa/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java -- diff --git a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java index 064365d..5fefcea 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java +++ b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java @@ -26,9 +26,9 @@ import java.util.Map; import java.util.Random; import java.util.Set; import java.util.function.BooleanSupplier; -import java.util.function.Supplier; import java.util.stream.Collectors; +import com.google.common.collect.Maps; import org.apache.storm.Config; import org.apache.storm.daemon.supervisor.AdvancedFSOps; import org.apache.storm.generated.StormTopology; @@ -79,34 +79,14 @@ public class ConfigUtils { return oldInstance; } -private static class MaskedConf implements Supplier> { -private final Map conf; - -MaskedConf(Map conf) { -this.conf = conf; -} - -@Override -public Map get() { -Map res = new HashMap<>(); -for (Map.Entry e : conf.entrySet()) { -if (passwordConfigKeys.contains(e.getKey())) { -res.put(e.getKey(), "*"); -} else { -res.put(e.getKey(), e.getValue()); -} -} -return res; -} - -@Override -public String toString() { -return get().toString(); -} -} - -public static Supplier> maskPasswords(Map conf) { -return new MaskedConf(conf); +public static Map maskPasswords(final Map conf) { +Maps.EntryTransformer maskPasswords = +new Maps.EntryTransformer() { +public Object transformEntry(String key, Object value) { +return passwordConfigKeys.contains(key) ? "*" : value; +} +}; +return Maps.transformEntries(conf, maskPasswords); } public static boolean isLocalMode(Map conf)
[5/5] storm git commit: Merge branch 'STORM-3184-master' of https://github.com/arunmahadevan/storm into STORM-3184-master
Merge branch 'STORM-3184-master' of https://github.com/arunmahadevan/storm into STORM-3184-master Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4c42ee3d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4c42ee3d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4c42ee3d Branch: refs/heads/master Commit: 4c42ee3d259d5d90a4e7d3445d1c119601eec6c7 Parents: c2931da c12ddb3 Author: Jungtaek Lim Authored: Tue Aug 14 23:24:04 2018 +0900 Committer: Jungtaek Lim Committed: Tue Aug 14 23:24:04 2018 +0900 -- .../AbstractHadoopNimbusPluginAutoCreds.java| 4 ++- .../src/jvm/org/apache/storm/Config.java| 2 ++ .../org/apache/storm/daemon/worker/Worker.java | 4 +-- .../jvm/org/apache/storm/utils/ConfigUtils.java | 37 .../storm/validation/ConfigValidation.java | 2 +- .../validation/ConfigValidationAnnotations.java | 6 .../java/org/apache/storm/DaemonConfig.java | 10 ++ .../org/apache/storm/daemon/nimbus/Nimbus.java | 5 ++- .../storm/daemon/supervisor/Supervisor.java | 2 +- .../java/org/apache/storm/DaemonConfigTest.java | 12 +++ 10 files changed, 76 insertions(+), 8 deletions(-) --
[4/5] storm git commit: STORM-3184: use shaded guava and remove the usage of redactValue
STORM-3184: use shaded guava and remove the usage of redactValue Change-Id: Idb0775f1c1b91cbc648087e7752783f119e034b3 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c12ddb3f Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c12ddb3f Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c12ddb3f Branch: refs/heads/master Commit: c12ddb3f7d6c28a35bd4c90b0df48f48715c3f6e Parents: 3358eff Author: Arun Mahadevan Authored: Sat Aug 11 21:15:10 2018 -0700 Committer: Arun Mahadevan Committed: Sat Aug 11 21:15:10 2018 -0700 -- storm-client/pom.xml | 6 -- storm-client/src/jvm/org/apache/storm/Config.java | 2 ++ .../src/jvm/org/apache/storm/daemon/worker/Worker.java | 3 +-- storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java | 2 +- .../src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java | 3 +-- 5 files changed, 5 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/c12ddb3f/storm-client/pom.xml -- diff --git a/storm-client/pom.xml b/storm-client/pom.xml index c925ffd..7c28cce 100644 --- a/storm-client/pom.xml +++ b/storm-client/pom.xml @@ -97,12 +97,6 @@ - -com.google.guava -guava -${guava.version} - - org.mockito http://git-wip-us.apache.org/repos/asf/storm/blob/c12ddb3f/storm-client/src/jvm/org/apache/storm/Config.java -- diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java index 318a130..4582d04 100644 --- a/storm-client/src/jvm/org/apache/storm/Config.java +++ b/storm-client/src/jvm/org/apache/storm/Config.java @@ -36,6 +36,7 @@ import org.apache.storm.validation.ConfigValidation.MetricRegistryValidator; import org.apache.storm.validation.ConfigValidation.MetricReportersValidator; import org.apache.storm.validation.ConfigValidationAnnotations.CustomValidator; import org.apache.storm.validation.ConfigValidationAnnotations.NotNull; +import org.apache.storm.validation.ConfigValidationAnnotations.Password; import org.apache.storm.validation.ConfigValidationAnnotations.isBoolean; import org.apache.storm.validation.ConfigValidationAnnotations.isImplementationOfClass; import org.apache.storm.validation.ConfigValidationAnnotations.isInteger; @@ -1146,6 +1147,7 @@ public class Config extends HashMap { * authentication. */ @isString +@Password public static final String STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD = "storm.zookeeper.topology.auth.payload"; /** * The cluster Zookeeper authentication scheme to use, e.g. "digest". Defaults to no authentication. http://git-wip-us.apache.org/repos/asf/storm/blob/c12ddb3f/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java -- diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java index 9f8428b..816114c 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java @@ -278,8 +278,7 @@ public class Worker implements Shutdownable, DaemonCommon { setupFlushTupleTimer(topologyConf, newExecutors); setupBackPressureCheckTimer(topologyConf); -LOG.info("Worker has topology config {}", Utils.redactValue(ConfigUtils.maskPasswords(topologyConf), -Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD)); +LOG.info("Worker has topology config {}", ConfigUtils.maskPasswords(topologyConf)); LOG.info("Worker {} for storm {} on {}:{} has finished loading", workerId, topologyId, assignmentId, port); return this; } http://git-wip-us.apache.org/repos/asf/storm/blob/c12ddb3f/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java -- diff --git a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java index 5fefcea..2219fee 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java +++ b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java @@ -28,7 +28,7 @@ import java.util.Set; import java.util.function.BooleanSupplier; import java.util.stream.Collectors; -import com.google.common.collect.Maps; +import org.apache.storm.shade.com.google.common.collect.Maps; import org.apache.storm.Config; import
[1/2] storm git commit: STORM-3183: Fix for visualization on Storm API
Repository: storm Updated Branches: refs/heads/master 154173a70 -> c2931dab7 STORM-3183: Fix for visualization on Storm API Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/078e8cc2 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/078e8cc2 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/078e8cc2 Branch: refs/heads/master Commit: 078e8cc239788bbec14a0e38492ed683341af7dc Parents: 16e5008 Author: Govind Menon Authored: Thu Aug 9 13:51:10 2018 -0500 Committer: Govind Menon Committed: Fri Aug 10 17:25:05 2018 -0500 -- .../org/apache/storm/daemon/ui/UIHelpers.java | 49 ++-- 1 file changed, 24 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/078e8cc2/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java -- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java index d6dc409..a080599 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java @@ -1348,12 +1348,12 @@ public class UIHelpers { */ public static Map getStatMapFromExecutorSummary(ExecutorSummary executorSummary) { Map result = new HashMap(); -result.put("host", executorSummary.get_host()); -result.put("port", executorSummary.get_port()); -result.put("uptime_secs", executorSummary.get_uptime_secs()); -result.put("transferred", null); +result.put(":host", executorSummary.get_host()); +result.put(":port", executorSummary.get_port()); +result.put(":uptime_secs", executorSummary.get_uptime_secs()); +result.put(":transferred", null); if (executorSummary.is_set_stats()) { -result.put("transferred", sanitizeTransferredStats(executorSummary.get_stats().get_transferred())); +result.put(":transferred", sanitizeTransferredStats(executorSummary.get_stats().get_transferred())); } return result; } @@ -1366,12 +1366,11 @@ public class UIHelpers { * @return getInputMap */ public static Map getInputMap(Map.Entry entryInput) { - Map result = new HashMap(); -result.put("component", entryInput.getKey().get_componentId()); -result.put("stream", entryInput.getKey().get_streamId()); -result.put("sani-stream", sanitizeStreamName(entryInput.getKey().get_streamId())); -result.put("grouping", entryInput.getValue().getSetField().getFieldName()); +result.put(":component", entryInput.getKey().get_componentId()); +result.put(":stream", entryInput.getKey().get_streamId()); +result.put(":sani-stream", sanitizeStreamName(entryInput.getKey().get_streamId())); +result.put(":grouping", entryInput.getValue().getSetField().getFieldName()); return result; } @@ -1402,17 +1401,17 @@ public class UIHelpers { String spoutComponentId = spoutSpecMapEntry.getKey(); if (spoutSummaries.containsKey(spoutComponentId)) { Map spoutData = new HashMap(); -spoutData.put("type", "spout"); -spoutData.put("capacity", 0); +spoutData.put(":type", "spout"); +spoutData.put(":capacity", 0); Map spoutStreamsStats = StatsUtil.spoutStreamsStats(spoutSummaries.get(spoutComponentId), true); -spoutData.put("latency", spoutStreamsStats.get("complete-latencies").get(window)); -spoutData.put("transferred", spoutStreamsStats.get("transferred").get(window)); -spoutData.put("stats", spoutSummaries.get( +spoutData.put(":latency", spoutStreamsStats.get("complete-latencies").get(window)); +spoutData.put(":transferred", spoutStreamsStats.get("transferred").get(window)); +spoutData.put(":stats", spoutSummaries.get( spoutComponentId).stream().map( UIHelpers::getStatMapFromExecutorSummary).collect(Collectors.toList())); -spoutData.put("link", UIHelpers.urlFormat("/component.html?id=%s_id=%s", spoutComponentId, topoId)); +spoutData.put(":link", UIHelpers.urlFormat("/component.html?id=%s_id=%s", spoutComponentId, topoId)); -result.put("inputs", +spoutData.put(":inputs", spoutSpecMapEntry.getValue().get_common().get_inputs().entrySet().stream().map( UIHelpers::getInputMap).collect(Collectors.toList()) ); @@ -1422,19
[2/2] storm git commit: Merge branch 'STORM-3183' of https://github.com/govind-menon/storm into STORM-3183-merge
Merge branch 'STORM-3183' of https://github.com/govind-menon/storm into STORM-3183-merge Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c2931dab Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c2931dab Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c2931dab Branch: refs/heads/master Commit: c2931dab720a2f6dcafa80a2a771c970302dca86 Parents: 154173a 078e8cc Author: Jungtaek Lim Authored: Sat Aug 11 20:55:44 2018 +0900 Committer: Jungtaek Lim Committed: Sat Aug 11 20:55:44 2018 +0900 -- .../org/apache/storm/daemon/ui/UIHelpers.java | 49 ++-- 1 file changed, 24 insertions(+), 25 deletions(-) --
[3/8] storm git commit: STORM-3181: Fix for completeLatency in TopologyStats
STORM-3181: Fix for completeLatency in TopologyStats Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/66a1aa34 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/66a1aa34 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/66a1aa34 Branch: refs/heads/master Commit: 66a1aa34141aa6f5b8b04c73935fd140cf47987e Parents: 73382d9 Author: Govind Menon Authored: Tue Aug 7 15:37:56 2018 -0500 Committer: Govind Menon Committed: Tue Aug 7 15:37:56 2018 -0500 -- .../src/main/java/org/apache/storm/daemon/ui/UIHelpers.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/66a1aa34/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java -- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java index 4096763..9567239 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java @@ -1148,7 +1148,7 @@ public class UIHelpers { temp.put("window", window); temp.put("emitted", emittedStatDisplayMap.get(window)); temp.put("transferred", transferred.get(window)); -temp.put("completeLatency", completeLatency.get(window)); +temp.put("completeLatency", StatsUtil.floatStr(completeLatency.get(getWindowHint(window; temp.put("acked", acked.get(window)); temp.put("failed", failed.get(window));
[8/8] storm git commit: Merge branch 'STORM-3179' of https://github.com/govind-menon/storm into STORM-3179-merge
Merge branch 'STORM-3179' of https://github.com/govind-menon/storm into STORM-3179-merge Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/16e50084 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/16e50084 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/16e50084 Branch: refs/heads/master Commit: 16e500844481aaf8179ea786d00030b1f18729b0 Parents: 70e379f bc1078d Author: Jungtaek Lim Authored: Fri Aug 10 19:27:46 2018 +0900 Committer: Jungtaek Lim Committed: Fri Aug 10 19:27:46 2018 +0900 -- .../org/apache/storm/daemon/ui/UIHelpers.java | 26 +--- 1 file changed, 12 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/16e50084/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java --
[4/8] storm git commit: STORM-3179: Fixing getNimbusSummary in Storm API
STORM-3179: Fixing getNimbusSummary in Storm API Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bc1078d9 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bc1078d9 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bc1078d9 Branch: refs/heads/master Commit: bc1078d9114da5840c133a56ffda5e1d917b023d Parents: 73382d9 Author: Govind Menon Authored: Tue Aug 7 17:38:23 2018 -0500 Committer: Govind Menon Committed: Tue Aug 7 17:38:23 2018 -0500 -- .../org/apache/storm/daemon/ui/UIHelpers.java | 26 +--- 1 file changed, 12 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/bc1078d9/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java -- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java index 4096763..8e88042 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java @@ -1909,21 +1909,19 @@ public class UIHelpers { for (NimbusSummary nimbusSummary : nimbusSummaries) { Map nimbusSummaryMap = new HashMap(); -if (nimbusSeeds.contains(nimbusSummary.get_host() + ":" + String.valueOf(nimbusSummary.get_port( { -nimbusSummaryMap.put("host", nimbusSummary.get_host()); -nimbusSummaryMap.put("port", nimbusSummary.get_port()); -String status = "Not a Leader"; -if (nimbusSummary.is_isLeader()) { -status = "Leader"; -} -nimbusSummaryMap.put("status", status); -nimbusSummaryMap.put("version", nimbusSummary.get_version()); -nimbusSummaryMap.put("nimbusUpTimeSeconds", nimbusSummary.get_uptime_secs()); -nimbusSummaryMap.put("nimbusUpTime", prettyUptimeSec(nimbusSummary.get_uptime_secs())); -nimbusSummaryMap.put("nimbusLogLink", getNimbusLogLink(nimbusSummary.get_host(), config)); -resultSummaryList.add(nimbusSummaryMap); -nimbusSeeds.remove(nimbusSummary.get_host() + ":" + String.valueOf(nimbusSummary.get_port())); +nimbusSummaryMap.put("host", nimbusSummary.get_host()); +nimbusSummaryMap.put("port", nimbusSummary.get_port()); +String status = "Not a Leader"; +if (nimbusSummary.is_isLeader()) { +status = "Leader"; } +nimbusSummaryMap.put("status", status); +nimbusSummaryMap.put("version", nimbusSummary.get_version()); +nimbusSummaryMap.put("nimbusUpTimeSeconds", nimbusSummary.get_uptime_secs()); +nimbusSummaryMap.put("nimbusUpTime", prettyUptimeSec(nimbusSummary.get_uptime_secs())); +nimbusSummaryMap.put("nimbusLogLink", getNimbusLogLink(nimbusSummary.get_host(), config)); +resultSummaryList.add(nimbusSummaryMap); +nimbusSeeds.remove(nimbusSummary.get_host() + ":" + String.valueOf(nimbusSummary.get_port())); } for (String nimbusSeed : nimbusSeeds) {
[1/8] storm git commit: STORM-3180 Total executors in Cluster Summary in main UI page is not exposed even a topology is running
Repository: storm Updated Branches: refs/heads/master c9efe3be9 -> 16e500844 STORM-3180 Total executors in Cluster Summary in main UI page is not exposed even a topology is running * rename the field of /cluster/summary output: totalExecutors to executorsTotal Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fdfeb141 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fdfeb141 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fdfeb141 Branch: refs/heads/master Commit: fdfeb14166301fed82b045f41934b693e95577c4 Parents: 1828a17 Author: Jungtaek Lim Authored: Mon Aug 6 22:07:53 2018 +0900 Committer: Jungtaek Lim Committed: Mon Aug 6 22:07:53 2018 +0900 -- .../src/main/java/org/apache/storm/daemon/ui/UIHelpers.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/fdfeb141/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java -- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java index 4096763..31c5423 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java @@ -571,7 +571,7 @@ public class UIHelpers { result.put("slotsTotal", totalSlots); result.put("slotsFree", totalSlots - usedSlots); result.put("tasksTotal", totalTasks); -result.put("totalExecutors", totalExecutors); +result.put("executorsTotal", totalExecutors); result.put("totalMem", supervisorTotalMemory); result.put("totalCpu", supervisorTotalCpu);
[2/8] storm git commit: STORM-3182: Removing superfluous topo id check from owner resource API request
STORM-3182: Removing superfluous topo id check from owner resource API request Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ecec9cf5 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ecec9cf5 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ecec9cf5 Branch: refs/heads/master Commit: ecec9cf52e61ffd889e51309b1bc2c1df99d7366 Parents: 73382d9 Author: Govind Menon Authored: Mon Aug 6 15:44:57 2018 -0500 Committer: Govind Menon Committed: Mon Aug 6 15:44:57 2018 -0500 -- .../org/apache/storm/daemon/ui/filters/AuthorizedUserFilter.java | 2 +- .../org/apache/storm/daemon/ui/resources/StormApiResource.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/ecec9cf5/storm-webapp/src/main/java/org/apache/storm/daemon/ui/filters/AuthorizedUserFilter.java -- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/filters/AuthorizedUserFilter.java b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/filters/AuthorizedUserFilter.java index c094b82..9a4a2a1 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/filters/AuthorizedUserFilter.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/filters/AuthorizedUserFilter.java @@ -99,7 +99,7 @@ public class AuthorizedUserFilter implements ContainerRequestFilter { } @Override -public void filter(ContainerRequestContext containerRequestContext) throws IOException { +public void filter(ContainerRequestContext containerRequestContext) { AuthNimbusOp annotation = resourceInfo.getResourceMethod().getAnnotation(AuthNimbusOp.class); if (annotation == null) { return; http://git-wip-us.apache.org/repos/asf/storm/blob/ecec9cf5/storm-webapp/src/main/java/org/apache/storm/daemon/ui/resources/StormApiResource.java -- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/resources/StormApiResource.java b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/resources/StormApiResource.java index 6b3c014..12a7f0e 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/resources/StormApiResource.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/resources/StormApiResource.java @@ -194,7 +194,7 @@ public class StormApiResource { */ @GET @Path("/owner-resources/{id}") -@AuthNimbusOp(value = "getOwnerResourceSummaries", needsTopoId = true) +@AuthNimbusOp(value = "getOwnerResourceSummaries") @Produces("application/json") public Response getOwnerResource(@PathParam("id") String id, @QueryParam(callbackParameterName) String callback) throws TException {
[6/8] storm git commit: Merge branch 'STORM-3182' of https://github.com/govind-menon/storm into STORM-3182-merge
Merge branch 'STORM-3182' of https://github.com/govind-menon/storm into STORM-3182-merge Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/44cf25fa Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/44cf25fa Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/44cf25fa Branch: refs/heads/master Commit: 44cf25fa9987b8f91f0147580ab245d73de51928 Parents: e270a1e ecec9cf Author: Jungtaek Lim Authored: Fri Aug 10 19:27:01 2018 +0900 Committer: Jungtaek Lim Committed: Fri Aug 10 19:27:01 2018 +0900 -- .../org/apache/storm/daemon/ui/filters/AuthorizedUserFilter.java | 2 +- .../org/apache/storm/daemon/ui/resources/StormApiResource.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) --
[7/8] storm git commit: Merge branch 'STORM-3181' of https://github.com/govind-menon/storm into STORM-3181-merge
Merge branch 'STORM-3181' of https://github.com/govind-menon/storm into STORM-3181-merge Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/70e379f7 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/70e379f7 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/70e379f7 Branch: refs/heads/master Commit: 70e379f7beef92ec6292119161ade8f4b3d20bdd Parents: 44cf25f 66a1aa3 Author: Jungtaek Lim Authored: Fri Aug 10 19:27:24 2018 +0900 Committer: Jungtaek Lim Committed: Fri Aug 10 19:27:24 2018 +0900 -- .../src/main/java/org/apache/storm/daemon/ui/UIHelpers.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/70e379f7/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java --
[5/8] storm git commit: Merge branch 'STORM-3180' of https://github.com/HeartSaVioR/storm into STORM-3180-merge
Merge branch 'STORM-3180' of https://github.com/HeartSaVioR/storm into STORM-3180-merge Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e270a1ee Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e270a1ee Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e270a1ee Branch: refs/heads/master Commit: e270a1eeab20d794dcc60a697c440b65b03aecc2 Parents: c9efe3b fdfeb14 Author: Jungtaek Lim Authored: Fri Aug 10 19:26:33 2018 +0900 Committer: Jungtaek Lim Committed: Fri Aug 10 19:26:33 2018 +0900 -- .../src/main/java/org/apache/storm/daemon/ui/UIHelpers.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) --
storm git commit: Update KEYS: add key information for "Jungtaek Lim"
Repository: storm Updated Branches: refs/heads/master 5f386b1da -> 9cc5b72c1 Update KEYS: add key information for "Jungtaek Lim" Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9cc5b72c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9cc5b72c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9cc5b72c Branch: refs/heads/master Commit: 9cc5b72c1fae78c60061b9f3c3f3d41277ca93cf Parents: 5f386b1 Author: Jungtaek Lim Authored: Sun Jul 29 22:52:17 2018 +0900 Committer: Jungtaek Lim Committed: Sun Jul 29 22:52:17 2018 +0900 -- KEYS | 79 +++ 1 file changed, 79 insertions(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/9cc5b72c/KEYS -- diff --git a/KEYS b/KEYS index d3bf1da..4b90e4d 100644 --- a/KEYS +++ b/KEYS @@ -79,3 +79,82 @@ Be/J4vDCRO3I+6qUpQwfNaUzjcHBaStzlV35mu/6Xeq7Kkr5VVmqqwT53Xig1laL Vw== =E8Vm -END PGP PUBLIC KEY BLOCK- +pub rsa2048 2016-01-21 [SC] + 3D3E9ACA18C5C9FE181A504B37D6756C2F471B9E +uid [ultimate] Jungtaek Lim (HeartSaVioR) +sig 337D6756C2F471B9E 2018-02-26 Jungtaek Lim (HeartSaVioR) +uid [ultimate] Jungtaek Lim (HeartSaVioR) +sig 337D6756C2F471B9E 2016-01-21 Jungtaek Lim (HeartSaVioR) +sig 950B0EE138256E78 2018-02-27 Gwan-gyeong Mun +sig BBE44E923A970AB7 2018-07-25 Francis Chuang +sig DDB6E9812AD3FAE3 2018-07-25 Julian Hyde (CODE SIGNING KEY) +sub rsa2048 2016-01-21 [E] +sig 37D6756C2F471B9E 2016-01-21 Jungtaek Lim (HeartSaVioR) + +-BEGIN PGP PUBLIC KEY BLOCK- + +mQENBFagQBgBCADVBeBdgam+O2IqAjf30flo0Yhm9Of4BgJMijbuypqLWT6xSrZh +Z4ZB6/FFbLyhkdKcmkzc+G29aQOYUG6jF94g69ZG4k8Zw+lLSA0r52lvS7zzvsfB ++hFi2w/R57+v7AyHSbAmEJqd5PGpmyyexLMFkcGWZFyyJgrJBQXOBmWpNKEJFIIw +wWla1SWbcEtfaPkTb1VzsUiDcw/hz9HMkgAx2zGaXnGE98ngXNi+RJ7BHEw0r1HD +e/BpmYbAgI2tQmCkC/Q+raeKPsVngho0aAnl5Vb3mUw0Ft+cyAjrdgcEpLYYGHNX +YUoXgfXZ3wMJeA939i591pdAY+JO9//nChBfABEBAAG0Lkp1bmd0YWVrIExpbSAo +SGVhcnRTYVZpb1IpIDxrYWJod2FuQGdtYWlsLmNvbT6JATgEEwECACIFAlagQBgC +GwMGCwkIBwMCBhUIAgkKCwQWAgMBAh4BAheAAAoJEDfWdWwvRxueDqsH/Awcpuud +9bIpwZgfn3rpJyPjZVfIFY6ELYFG+mTXDFum5Lau9F0YRIseu8A+COuKYnC1FZtG +Q/C1rKePddmSV3aeOiorJtfabYAe9yGY+5jy6Fq3fQsJ2FpYRrJjR8xfDLGf38T6 +E9H/FotSFr7lnE2zSkhx/mYk4KhAWQOPls9W8yNTAENtg8vkybbA8egutUbwT5ic +c6I5QsBA3+rSgCnJ3wb82wDEpLHmF4OZzs8/uhbqmIFtzE6TeFQ6UtPH/CEX1Cvk +SKhRz86PzVxWkt4jqthjxHihUKlfw3nfr4QRs4HN0+1RWHNVyzEmyXFSpo4LwmFI +n7DvYxWN3Y6FZ42JATMEEAEIAB0WIQT5+o44yp7gqj5QWr6VCw7hOCVueAUCWpTe +pAAKCRCVCw7hOCVueO1sCACeujHcQjV6uDcU3UZQ+7+54FnS9y7LEdJ9CYv5PkiM +ZWlCjHE/3lVAO5Vl9VoQSxsadoZyNEFpBTiQnrapMJjeYa0fW0525RMD23oqUk+t +ULMlY2bflCSzrRGjstLrtFMgNxzfDWJN0EYzS36sxiniI6S3gQdOkbG4jLe9jZSP +0K8OEF6k9uZm8soRaUVgVH4D+76ktNSPP2wFF6bfglbVpqom1wqoRcDQVDovSBcJ +Bra+ls6C7YAiEuH5sa3j8oEc7l6DB37X7z2mQMP1+SzrkSQAasmh2X+x+RUzittJ +Fc1oksyA4AAt0zE++ZlN5XLp/bSO+o4BIDqQwIv8pxX2iQIcBBABCgAGBQJbWQTj +AAoJELvkTpI6lwq3/BQP/RR7oQhAT7yyBv1pD7IqSvtWEkodmCm5unlf1vCvFJSC +GOFpkQbkkg4d1evgwdkCUfhE9+YopphAT4lBVLdAQWHpOuDYv9Lt6MeTH++0Uopn ++JRu8ZgVnpRg/CZmSBbucMjTvSRAT2y9BGePNh1SemC5FUP2+e1zK18m/ERio3J4 +lIjDDmgEFfqkzGFONLH30pQHDxEFSCZzpndG021xloPPm79T/GYm/GBoX0PoNxW7 +4VA+ugzkCBenJQcBJ0PWTus43Zlog7I86uJH02a40W/Q2G3moL+sNXiuRnp0ch8m +Mi7aHl6D9z4SFlTrgvdpIwc6riBdzWumkd0JlUadO5HCTZ+o/4xYVBikGqadI9OJ +twhrHBu/rcuyaAbA0ncmt+G7+WqxSvC2GMoEKsOwUTCz/gTqF6K4uOdk24RJ9pzY +xqYeyrKlCm95O1+xsb74av2S7Z7xaB7QFNpWb0YWsfy0ERNoHZWsEdfxI/q6aFSG +Ta52s8XtmCSkScjVIXBFjkX/EC3MxATC12yXGspOI10ZIhwuIt6BKL9OlKVgBN7n +PkfRLaqnn80VNutwDkRhiNaSe99pkemFMYMUeAEG59V5NPZHbA8aLW81KY0sfvlj +StZPWd8Pqw0ase/86paHHX0jLNy89ibT72HlH4904kZ7a0pOjvc4/nfQNFVDG51B +iQIzBBABCgAdFiEE3TFPXhdzfXalVNzo3bbpgSrT+uMFAltZBEcACgkQ3bbpgSrT ++uNjmg//XcMDyrtHdQChOCLE1X4bZEkCI6ZzuKogORAHFuueoDQXwhOnMzxfaIWt +xzBtO25RqA8Fe0PNz3t2W7e3OlViyOpPvidQu80sJz6Rv4fkY0ZQ+aejTYfakGgQ +gP7apdK8O14WZXYzrHGtJVKK+7x2uSBYTeTUKjRs8JMYQgqklSwgzz91lFlikXGp +FKgeeKCXacm5ZXrwzAAahVqAJsWqGVO0XSGjOx7jrZrSgXntvoAHvkZ3R67r6FcB +oyFllntlgI3nq9PVir8lWbY1C8GjtuzvcC1hUvY68Ct4Uk6IdCnQtvsDXcrT7Jy6 +8J/1Pl8Mi9gPxng94g2oyquLvxjWnjNXwdYMCAjiQAyB7RtUXI+AcJS6Fcr81vpB +WfOha3xK6zTwNxTGfbV1X1SqEEV/GTh6KxBIlEwTTuleiCI+CsYUElxjyuNNXc7c +/gai8FqVi9xAx2czLPld+FSiia65w6WHymd1E97gunW2LXQECpzoxhL21x7854fi +PB25R8/PAmvolENoKjHM9UYbEitrzM7Bom/6IAkQu7sS5opJvO1CTgQoyLGwUYp0 +5l23gIzp5LqU3ZSLvKAPwnlBlPOmQYX3EF+pw2cYAGjHpjWz7JV4pYUPiSSKrNVb +6Xjg4lCsum/mxr+fD2KK3yYASY/t7780nXGvWMlwy0sqr5l2kHS0L0p1bmd0YWVr +IExpbSAoSGVhcnRTYVZpb1IpIDxrYWJod2FuQGFwYWNoZS5vcmc+iQFOBBMBCAA4 +FiEEPT6ayhjFyf4YGlBLN9Z1bC9HG54FAlqTyMMCGwMFCwkIBwIGFQgJCgsCBBYC +AwECHgECF4AACgkQN9Z1bC9HG57sJwf8CFMnhpITcpQLWf8g1WlRUMx84uZJvssj +3RrzXYm/I0/czumcq9yuauKMMipVxTYGIwampH6SIp6Spa3+WUOqKVrIiElkBI9Y
[2/2] storm git commit: Merge branch 'agresch_kerblog' of https://github.com/agresch/storm into STORM-3158-merge
Merge branch 'agresch_kerblog' of https://github.com/agresch/storm into STORM-3158-merge Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5f386b1d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5f386b1d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5f386b1d Branch: refs/heads/master Commit: 5f386b1da383f6b4039d81074e5ce58c191483bf Parents: b9dbf8c de59923 Author: Jungtaek Lim Authored: Sun Jul 29 22:17:42 2018 +0900 Committer: Jungtaek Lim Committed: Sun Jul 29 22:17:42 2018 +0900 -- .../apache/storm/security/auth/kerberos/ServerCallbackHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) --
[1/2] storm git commit: STORM-3158 improve logging on login failure when kerberos is misconfigured
Repository: storm Updated Branches: refs/heads/master b9dbf8c97 -> 5f386b1da STORM-3158 improve logging on login failure when kerberos is misconfigured Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/de59923e Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/de59923e Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/de59923e Branch: refs/heads/master Commit: de59923e37911885975fc097d4313fae14ef5f37 Parents: a7e817b Author: Aaron Gresch Authored: Fri Jul 20 16:02:43 2018 -0500 Committer: Aaron Gresch Committed: Fri Jul 20 16:02:43 2018 -0500 -- .../apache/storm/security/auth/kerberos/ServerCallbackHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/de59923e/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java -- diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java index acece33..139ad54 100644 --- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java +++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java @@ -77,7 +77,7 @@ public class ServerCallbackHandler implements CallbackHandler { } if (pc != null) { -LOG.warn("No password found for user: {}", userName); +LOG.error("No password found for user: {}, validate klist matches jaas conf", userName); } if (ac != null) {
[1/2] storm git commit: MINOR - Make raw type assignment type safe
Repository: storm Updated Branches: refs/heads/master 82961751f -> b9dbf8c97 MINOR - Make raw type assignment type safe Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5084c975 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5084c975 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5084c975 Branch: refs/heads/master Commit: 5084c9756a4d792c4ac3a2a64e0d0ec2da8530e5 Parents: 7a0fa47 Author: Hugo Louro Authored: Tue Jul 24 18:42:23 2018 -0700 Committer: Hugo Louro Committed: Tue Jul 24 18:42:23 2018 -0700 -- .../src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/5084c975/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java -- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 9aeba6b..d6befd5 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -156,7 +156,7 @@ public class KafkaSpout extends BaseRichSpout { private void registerMetric() { LOG.info("Registering Spout Metrics"); -kafkaOffsetMetric = new KafkaOffsetMetric(() -> Collections.unmodifiableMap(offsetManagers), () -> consumer); +kafkaOffsetMetric = new KafkaOffsetMetric<>(() -> Collections.unmodifiableMap(offsetManagers), () -> consumer); context.registerMetric("kafkaOffset", kafkaOffsetMetric, kafkaSpoutConfig.getMetricsTimeBucketSizeInSecs()); }
[2/2] storm git commit: Merge branch 'master_skc_RawTypeAssignSafe' of https://github.com/hmcl/storm-apache into PR-2775-merge
Merge branch 'master_skc_RawTypeAssignSafe' of https://github.com/hmcl/storm-apache into PR-2775-merge Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b9dbf8c9 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b9dbf8c9 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b9dbf8c9 Branch: refs/heads/master Commit: b9dbf8c9700c847e7d4fe0fd1e8b51b1fac9ef6c Parents: 8296175 5084c97 Author: Jungtaek Lim Authored: Sun Jul 29 22:15:29 2018 +0900 Committer: Jungtaek Lim Committed: Sun Jul 29 22:15:29 2018 +0900 -- .../src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) --
[1/2] storm git commit: STORM-3121: Fix flaky metrics tests in storm-core
Repository: storm Updated Branches: refs/heads/1.x-branch 335b388aa -> 93732d311 STORM-3121: Fix flaky metrics tests in storm-core Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7e271799 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7e271799 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7e271799 Branch: refs/heads/1.x-branch Commit: 7e271799adeeea24d4ac7b3477f4988c34cf7daa Parents: 335b388 Author: Stig Rohde Døssing Authored: Sun Jun 24 12:55:11 2018 +0200 Committer: Jungtaek Lim Committed: Sun Jul 29 17:39:53 2018 +0900 -- pom.xml | 7 + storm-core/pom.xml | 5 + .../test/clj/org/apache/storm/metrics_test.clj | 169 ++- storm-core/test/resources/log4j2-test.xml | 1 + 4 files changed, 101 insertions(+), 81 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/7e271799/pom.xml -- diff --git a/pom.xml b/pom.xml index 3ff1741..cbbce20 100644 --- a/pom.xml +++ b/pom.xml @@ -265,6 +265,7 @@ 2.3 0.9.3 4.11 +3.1.0 2.5.1 2.1.7 1.3 @@ -945,6 +946,12 @@ test +org.awaitility +awaitility +${awaitility.version} +test + + org.hamcrest hamcrest-core ${hamcrest.version} http://git-wip-us.apache.org/repos/asf/storm/blob/7e271799/storm-core/pom.xml -- diff --git a/storm-core/pom.xml b/storm-core/pom.xml index f5c58fe..3e6282c 100644 --- a/storm-core/pom.xml +++ b/storm-core/pom.xml @@ -280,6 +280,11 @@ test +org.awaitility +awaitility +test + + org.hamcrest hamcrest-library test http://git-wip-us.apache.org/repos/asf/storm/blob/7e271799/storm-core/test/clj/org/apache/storm/metrics_test.clj -- diff --git a/storm-core/test/clj/org/apache/storm/metrics_test.clj b/storm-core/test/clj/org/apache/storm/metrics_test.clj index ba48ea9..b20ad74 100644 --- a/storm-core/test/clj/org/apache/storm/metrics_test.clj +++ b/storm-core/test/clj/org/apache/storm/metrics_test.clj @@ -15,16 +15,14 @@ ;; limitations under the License. (ns org.apache.storm.metrics-test (:use [clojure test]) - (:import [org.apache.storm.topology TopologyBuilder]) - (:import [org.apache.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus]) - (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount -TestAggregatesCounter TestConfBolt AckFailMapTracker PythonShellMetricsBolt PythonShellMetricsSpout]) - (:import [org.apache.storm.task ShellBolt]) - (:import [org.apache.storm.spout ShellSpout]) - (:import [org.apache.storm.metric.api CountMetric IMetricsConsumer$DataPoint IMetricsConsumer$TaskInfo]) - (:import [org.apache.storm.metric.api.rpc CountShellMetric]) - (:import [org.apache.storm.utils Utils]) - + (:import [org.apache.storm.testing AckFailMapTracker PythonShellMetricsBolt PythonShellMetricsSpout]) + (:import [org.apache.storm.metric.api CountMetric]) + (:import [org.apache.storm Testing]) + (:import [org.awaitility Awaitility]) + (:import [org.awaitility.core ConditionEvaluationListener ConditionTimeoutException]) + (:import [java.util.concurrent TimeUnit Callable]) + (:import [org.hamcrest CoreMatchers]) + (:use [org.apache.storm testing clojure config]) (:use [org.apache.storm.daemon common]) (:use [org.apache.storm.metric testing]) @@ -87,12 +85,22 @@ (first) ;; pick first task in the list, ignore other tasks' metric data. (second) (or []))) - -(defmacro assert-buckets! [comp-id metric-name expected cluster] - `(do - (let [N# (count ~expected)] - (wait-for-atleast-N-buckets! N# ~comp-id ~metric-name ~cluster) - (is (= ~expected (subvec (lookup-bucket-by-comp-id-&-metric-name! ~comp-id ~metric-name) 0 N#)) + +(defn assert-metric-running-sum! [comp-id metric-name expected min-buckets cluster] + (try +(do + (wait-for-atleast-N-buckets! min-buckets comp-id metric-name cluster) + (.until +(.atMost + (.conditionEvaluationListener +(.pollInterval (Awaitility/with) 10 TimeUnit/MILLISECONDS) +(reify ConditionEvaluationListener (conditionEvaluated [this condition] + (advance-cluster-time cluster 1 + TEST-TIMEOUT-MS
[2/2] storm git commit: Merge branch 'STORM-3121-1.x' of https://github.com/HeartSaVioR/storm into STORM-3121-1.x-merge
Merge branch 'STORM-3121-1.x' of https://github.com/HeartSaVioR/storm into STORM-3121-1.x-merge Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/93732d31 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/93732d31 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/93732d31 Branch: refs/heads/1.x-branch Commit: 93732d3114dc84a9baeb8f482b7b401be2d6ee08 Parents: 335b388 7e27179 Author: Jungtaek Lim Authored: Sun Jul 29 21:50:07 2018 +0900 Committer: Jungtaek Lim Committed: Sun Jul 29 21:50:07 2018 +0900 -- pom.xml | 7 + storm-core/pom.xml | 5 + .../test/clj/org/apache/storm/metrics_test.clj | 169 ++- storm-core/test/resources/log4j2-test.xml | 1 + 4 files changed, 101 insertions(+), 81 deletions(-) --
[2/2] storm git commit: Merge branch 'STORM-3161-1.x' of https://github.com/HeartSaVioR/storm into STORM-3161-1.x-merge
Merge branch 'STORM-3161-1.x' of https://github.com/HeartSaVioR/storm into STORM-3161-1.x-merge Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/335b388a Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/335b388a Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/335b388a Branch: refs/heads/1.x-branch Commit: 335b388aa6983ae22fb9411c738001a8084cff1f Parents: 3b5f9e7 f1e9a79 Author: Jungtaek Lim Authored: Sun Jul 29 16:15:33 2018 +0900 Committer: Jungtaek Lim Committed: Sun Jul 29 16:15:33 2018 +0900 -- storm-core/src/clj/org/apache/storm/testing.clj | 7 ++- 1 file changed, 6 insertions(+), 1 deletion(-) --
[1/2] storm git commit: STORM-3161 Local mode should force setting min replication count to 1
Repository: storm Updated Branches: refs/heads/1.x-branch 3b5f9e7c7 -> 335b388aa STORM-3161 Local mode should force setting min replication count to 1 When topology.min.replication.count is set to more than 1, nimbus in local mode never achieve condition for replication, hence stuck on handling blobs. We should force set it to 1 in local mode to avoid this situation. Also set storm.home to new temporary directory to avoid using actual log directory. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f1e9a794 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f1e9a794 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f1e9a794 Branch: refs/heads/1.x-branch Commit: f1e9a7941d6c90ab687a49eee49eb272e771d94b Parents: 3b5f9e7 Author: Jungtaek Lim Authored: Fri Jul 27 17:31:29 2018 +0900 Committer: Jungtaek Lim Committed: Fri Jul 27 17:31:29 2018 +0900 -- storm-core/src/clj/org/apache/storm/testing.clj | 7 ++- 1 file changed, 6 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/f1e9a794/storm-core/src/clj/org/apache/storm/testing.clj -- diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj index db6b94b..41bc903 100644 --- a/storm-core/src/clj/org/apache/storm/testing.clj +++ b/storm-core/src/clj/org/apache/storm/testing.clj @@ -140,6 +140,7 @@ [zk-port zk-handle] (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS) (zk/mk-inprocess-zookeeper zk-tmp)) nimbus-tmp (local-temp-path) +storm-home-tmp (local-temp-path) daemon-conf (merge (read-storm-config) {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true ZMQ-LINGER-MILLIS 0 @@ -147,11 +148,15 @@ TOPOLOGY-TRIDENT-BATCH-EMIT-INTERVAL-MILLIS 50 STORM-CLUSTER-MODE "local" BLOBSTORE-SUPERUSER (System/getProperty "user.name") -BLOBSTORE-DIR nimbus-tmp} +BLOBSTORE-DIR nimbus-tmp +TOPOLOGY-MIN-REPLICATION-COUNT 1} (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS) {STORM-ZOOKEEPER-PORT zk-port STORM-ZOOKEEPER-SERVERS ["localhost"]}) daemon-conf) +; setting storm.home to set log dir as well as worker-artifacts as tmp directory +_ (local-mkdirs storm-home-tmp) +_ (System/setProperty "storm.home" storm-home-tmp) port-counter (mk-counter supervisor-slot-port-min) nimbus (nimbus/service-handler (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
[2/2] storm git commit: Merge branch 'STORM-3161' of https://github.com/HeartSaVioR/storm into STORM-3161-merge
Merge branch 'STORM-3161' of https://github.com/HeartSaVioR/storm into STORM-3161-merge Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/82961751 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/82961751 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/82961751 Branch: refs/heads/master Commit: 82961751f9c09bccd776e6ad4c67006a5a93ce65 Parents: 7a0fa47 882cfff Author: Jungtaek Lim Authored: Sun Jul 29 16:14:50 2018 +0900 Committer: Jungtaek Lim Committed: Sun Jul 29 16:14:50 2018 +0900 -- storm-server/src/main/java/org/apache/storm/LocalCluster.java | 1 + 1 file changed, 1 insertion(+) --
[1/2] storm git commit: STORM-3161 Local mode should force setting min replication count to 1
Repository: storm Updated Branches: refs/heads/master 7a0fa47f7 -> 82961751f STORM-3161 Local mode should force setting min replication count to 1 When topology.min.replication.count is set to more than 1, nimbus in local mode never achieve condition for replication, hence stuck on handling blobs. We should force set it to 1 in local mode to avoid this situation. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/882cfff9 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/882cfff9 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/882cfff9 Branch: refs/heads/master Commit: 882cfff9f8249248ff09b5898389523512ede40b Parents: a7e817b Author: Jungtaek Lim Authored: Fri Jul 27 16:58:37 2018 +0900 Committer: Jungtaek Lim Committed: Fri Jul 27 17:03:57 2018 +0900 -- storm-server/src/main/java/org/apache/storm/LocalCluster.java | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/882cfff9/storm-server/src/main/java/org/apache/storm/LocalCluster.java -- diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java index 0e314c9..11f765b 100644 --- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java +++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java @@ -208,6 +208,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { conf.put(Config.STORM_CLUSTER_MODE, "local"); conf.put(Config.BLOBSTORE_SUPERUSER, System.getProperty("user.name")); conf.put(Config.BLOBSTORE_DIR, nimbusTmp.getPath()); +conf.put(Config.TOPOLOGY_MIN_REPLICATION_COUNT, 1); InProcessZookeeper zookeeper = null; if (!builder.daemonConf.containsKey(Config.STORM_ZOOKEEPER_SERVERS)) {
[1/7] storm git commit: STORM-2953: Remove storm-kafka
Repository: storm Updated Branches: refs/heads/master 3883bf74c -> a7e817bcd http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/pom.xml -- diff --git a/pom.xml b/pom.xml index 503924c..ea5b616 100644 --- a/pom.xml +++ b/pom.xml @@ -315,14 +315,9 @@ 4.2.0 2.9.4 - -0.8.2.2 -kafka_2.10 - - + 0.10.1.0 - org.apache.storm.testing.IntegrationTest, org.apache.storm.testing.PerformanceTest **/Test*.java, **/*Test.java, **/*TestCase.java @@ -365,7 +360,6 @@ external/storm-autocreds -external/storm-kafka external/storm-hdfs external/storm-hdfs-blobstore external/storm-hbase @@ -397,7 +391,6 @@ examples/storm-redis-examples examples/storm-opentsdb-examples examples/storm-solr-examples -examples/storm-kafka-examples examples/storm-kafka-client-examples examples/storm-jdbc-examples examples/storm-hdfs-examples @@ -1085,28 +1078,6 @@ calcite-core ${calcite.version} - - - -org.apache.kafka -${storm.kafka.artifact.id} -${storm.kafka.version} -provided - - -org.apache.zookeeper -zookeeper - - -log4j -log4j - - -org.slf4j -slf4j-log4j12 - - - org.apache.kafka kafka-clients http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java -- diff --git a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java index db4c1d7..25a5312 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java +++ b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java @@ -18,7 +18,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.storm.Config; import org.apache.storm.generated.ComponentCommon; import org.apache.storm.generated.ComponentObject; import org.apache.storm.generated.SpoutSpec; @@ -49,9 +48,7 @@ public class TopologySpoutLag { className = getClassNameFromComponentObject(componentObject); logger.debug("spout classname: {}", className); if (className.endsWith("storm.kafka.spout.KafkaSpout")) { -result.put(spout.getKey(), getLagResultForNewKafkaSpout(spout.getKey(), spoutSpec, topologyConf)); -} else if (className.endsWith("storm.kafka.KafkaSpout")) { -result.put(spout.getKey(), getLagResultForOldKafkaSpout(spout.getKey(), spoutSpec, topologyConf)); +result.put(spout.getKey(), getLagResultForNewKafkaSpout(spout.getKey(), spoutSpec)); } } catch (Exception e) { logger.warn("Exception thrown while getting lag for spout id: " + spout.getKey() + " and spout class: " + className); @@ -93,46 +90,8 @@ public class TopologySpoutLag { } return commands; } - -private static List getCommandLineOptionsForOldKafkaSpout(Map jsonConf, Map topologyConf) { -logger.debug("json configuration: {}", jsonConf); - -List commands = new ArrayList<>(); -String configKeyPrefix = "config."; -commands.add("-o"); -commands.add("-t"); -commands.add((String) jsonConf.get(configKeyPrefix + "topics")); -commands.add("-n"); -commands.add((String) jsonConf.get(configKeyPrefix + "zkRoot")); -String zkServers = (String) jsonConf.get(configKeyPrefix + "zkServers"); -if (zkServers == null || zkServers.isEmpty()) { -StringBuilder zkServersBuilder = new StringBuilder(); -Integer zkPort = ((Number) topologyConf.get(Config.STORM_ZOOKEEPER_PORT)).intValue(); -for (String zkServer : (List) topologyConf.get(Config.STORM_ZOOKEEPER_SERVERS)) { -zkServersBuilder.append(zkServer + ":" + zkPort + ","); -} -zkServers = zkServersBuilder.toString(); -} -commands.add("-z"); -commands.add(zkServers); -if (jsonConf.get(configKeyPrefix + "leaders") != null) { -commands.add("-p"); -commands.add((String) jsonConf.get(configKeyPrefix + "partitions")); -commands.add("-l"); -
[6/7] storm git commit: STORM-2953: Remove storm-kafka
STORM-2953: Remove storm-kafka Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e58ac3e0 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e58ac3e0 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e58ac3e0 Branch: refs/heads/master Commit: e58ac3e033670aeb83e8543ab7b8779227ca10d8 Parents: 7f992a6 Author: Stig Rohde Døssing Authored: Tue Jul 17 18:29:08 2018 +0200 Committer: Stig Rohde Døssing Committed: Wed Jul 18 11:01:13 2018 +0200 -- docs/index.md | 2 +- docs/storm-kafka-client.md | 7 +- docs/storm-kafka.md | 399 -- examples/storm-kafka-examples/pom.xml | 110 - .../kafka/trident/KafkaProducerTopology.java| 75 .../trident/TridentKafkaConsumerTopology.java | 42 -- .../trident/TridentKafkaRandomStrings.java | 82 .../kafka/trident/TridentKafkaTopology.java | 83 external/storm-kafka-monitor/pom.xml| 5 - .../storm/kafka/monitor/KafkaOffsetLagUtil.java | 298 +- .../kafka/monitor/OldKafkaSpoutOffsetQuery.java | 127 -- external/storm-kafka/README.md | 382 - external/storm-kafka/pom.xml| 125 -- .../src/jvm/org/apache/storm/kafka/Broker.java | 79 .../jvm/org/apache/storm/kafka/BrokerHosts.java | 20 - .../storm/kafka/ByteBufferSerializer.java | 41 -- .../storm/kafka/DynamicBrokersReader.java | 208 -- .../kafka/DynamicPartitionConnections.java | 91 - .../ExponentialBackoffMsgRetryManager.java | 201 - .../storm/kafka/FailedFetchException.java | 24 -- .../storm/kafka/FailedMsgRetryManager.java | 77 .../org/apache/storm/kafka/IntSerializer.java | 42 -- .../jvm/org/apache/storm/kafka/KafkaConfig.java | 49 --- .../jvm/org/apache/storm/kafka/KafkaError.java | 38 -- .../jvm/org/apache/storm/kafka/KafkaSpout.java | 255 .../jvm/org/apache/storm/kafka/KafkaUtils.java | 288 - .../org/apache/storm/kafka/KeyValueScheme.java | 21 - .../kafka/KeyValueSchemeAsMultiScheme.java | 35 -- .../storm/kafka/MessageMetadataScheme.java | 21 - .../MessageMetadataSchemeAsMultiScheme.java | 35 -- .../jvm/org/apache/storm/kafka/Partition.java | 85 .../storm/kafka/PartitionCoordinator.java | 23 -- .../apache/storm/kafka/PartitionManager.java| 405 --- .../jvm/org/apache/storm/kafka/SpoutConfig.java | 58 --- .../apache/storm/kafka/StaticCoordinator.java | 50 --- .../jvm/org/apache/storm/kafka/StaticHosts.java | 33 -- .../storm/kafka/StaticPartitionConnections.java | 46 --- .../storm/kafka/StringKeyValueScheme.java | 32 -- .../kafka/StringMessageAndMetadataScheme.java | 36 -- .../storm/kafka/StringMultiSchemeWithTopic.java | 41 -- .../org/apache/storm/kafka/StringScheme.java| 44 -- .../kafka/TopicOffsetOutOfRangeException.java | 26 -- .../org/apache/storm/kafka/ZkCoordinator.java | 127 -- .../src/jvm/org/apache/storm/kafka/ZkHosts.java | 31 -- .../src/jvm/org/apache/storm/kafka/ZkState.java | 112 - .../org/apache/storm/kafka/bolt/KafkaBolt.java | 172 .../FieldNameBasedTupleToKafkaMapper.java | 43 -- .../kafka/bolt/mapper/TupleToKafkaMapper.java | 27 -- .../bolt/selector/DefaultTopicSelector.java | 29 -- .../bolt/selector/FieldIndexTopicSelector.java | 43 -- .../bolt/selector/FieldNameTopicSelector.java | 44 -- .../kafka/bolt/selector/KafkaTopicSelector.java | 20 - .../apache/storm/kafka/trident/Coordinator.java | 46 --- .../storm/kafka/trident/DefaultCoordinator.java | 26 -- .../trident/GlobalPartitionInformation.java | 113 -- .../storm/kafka/trident/IBatchCoordinator.java | 21 - .../storm/kafka/trident/IBrokerReader.java | 24 -- .../apache/storm/kafka/trident/MaxMetric.java | 35 -- .../kafka/trident/OpaqueTridentKafkaSpout.java | 62 --- .../storm/kafka/trident/StaticBrokerReader.java | 44 -- .../trident/TransactionalTridentKafkaSpout.java | 50 --- .../storm/kafka/trident/TridentKafkaConfig.java | 32 -- .../kafka/trident/TridentKafkaEmitter.java | 306 -- .../storm/kafka/trident/TridentKafkaState.java | 110 - .../kafka/trident/TridentKafkaStateFactory.java | 57 --- .../kafka/trident/TridentKafkaUpdater.java | 25 -- .../storm/kafka/trident/ZkBrokerReader.java | 79 .../FieldNameBasedTupleToKafkaMapper.java | 36 -- .../mapper/TridentTupleToKafkaMapper.java | 22 - .../trident/selector/DefaultTopicSelector.java | 29 -- .../trident/selector/KafkaTopicSelector.java| 20 - .../storm/kafka/DynamicBrokersReaderTest.java | 245 --- .../ExponentialBackoffMsgRetryManagerTest.java | 279
[3/7] storm git commit: STORM-2953: Remove storm-kafka
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java -- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java deleted file mode 100644 index 7276ef6..000 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ - -package org.apache.storm.kafka.bolt.mapper; - -import org.apache.storm.tuple.Tuple; - -public class FieldNameBasedTupleToKafkaMapper implements TupleToKafkaMapper { - -public static final String BOLT_KEY = "key"; -public static final String BOLT_MESSAGE = "message"; -public String boltKeyField; -public String boltMessageField; - -public FieldNameBasedTupleToKafkaMapper() { -this(BOLT_KEY, BOLT_MESSAGE); -} - -public FieldNameBasedTupleToKafkaMapper(String boltKeyField, String boltMessageField) { -this.boltKeyField = boltKeyField; -this.boltMessageField = boltMessageField; -} - -@Override -public K getKeyFromTuple(Tuple tuple) { -//for backward compatibility, we return null when key is not present. -return tuple.contains(boltKeyField) ? (K) tuple.getValueByField(boltKeyField) : null; -} - -@Override -public V getMessageFromTuple(Tuple tuple) { -return (V) tuple.getValueByField(boltMessageField); -} -} http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java -- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java deleted file mode 100644 index 7012e6b..000 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ - -package org.apache.storm.kafka.bolt.mapper; - -import java.io.Serializable; -import org.apache.storm.tuple.Tuple; - -/** - * as the really verbose name suggests this interface mapps a storm tuple to kafka key and message. - * @param type of key. - * @param type of value. - */ -public interface TupleToKafkaMapper extends Serializable { -K getKeyFromTuple(Tuple tuple); - -V getMessageFromTuple(Tuple tuple); -} http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java -- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java deleted file mode 100644 index d1784b0..000 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See
[2/7] storm git commit: STORM-2953: Remove storm-kafka
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java -- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java deleted file mode 100644 index 5fcee28..000 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java +++ /dev/null @@ -1,279 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ - -package org.apache.storm.kafka; - -import org.apache.storm.utils.Time; -import org.apache.storm.utils.Time.SimulatedTime; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -public class ExponentialBackoffMsgRetryManagerTest { - -private static final Long TEST_OFFSET = 101L; -private static final Long TEST_OFFSET2 = 102L; -private static final Long TEST_OFFSET3 = 105L; -private static final Long TEST_NEW_OFFSET = 103L; -private SimulatedTime st; - -@Before -public void setup() throws Exception { -st = new SimulatedTime(); -} - -@After -public void cleanup() throws Exception { -if (st != null) { -st.close(); -st = null; -} -} - -@Test -public void testImmediateRetry() throws Exception { - - -ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE); -manager.failed(TEST_OFFSET); -Long next = manager.nextFailedMessageToRetry(); -assertEquals("expect test offset next available for retry", TEST_OFFSET, next); -assertTrue("message should be ready for retry immediately", manager.shouldReEmitMsg(TEST_OFFSET)); - -manager.retryStarted(TEST_OFFSET); - -manager.failed(TEST_OFFSET); -next = manager.nextFailedMessageToRetry(); -assertEquals("expect test offset next available for retry", TEST_OFFSET, next); -assertTrue("message should be ready for retry immediately", manager.shouldReEmitMsg(TEST_OFFSET)); -} - -@Test -public void testSingleDelay() throws Exception { -ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(100, 1d, 1000, Integer.MAX_VALUE); -manager.failed(TEST_OFFSET); -Time.advanceTime(5); -Long next = manager.nextFailedMessageToRetry(); -assertNull("expect no message ready for retry yet", next); -assertFalse("message should not be ready for retry yet", manager.shouldReEmitMsg(TEST_OFFSET)); - -Time.advanceTime(100); -next = manager.nextFailedMessageToRetry(); -assertEquals("expect test offset next available for retry", TEST_OFFSET, next); -assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET)); -} - -@Test -public void testExponentialBackoff() throws Exception { -final long initial = 10; -final double mult = 2d; -ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(initial, mult, initial * 10, Integer.MAX_VALUE); - -long expectedWaitTime = initial; -for (long i = 0L; i < 3L; ++i) { -manager.failed(TEST_OFFSET); - -Time.advanceTime((expectedWaitTime + 1L) / 2L); -assertFalse("message should not be ready for retry yet", manager.shouldReEmitMsg(TEST_OFFSET)); - -Time.advanceTime((expectedWaitTime + 1L) / 2L); -Long next = manager.nextFailedMessageToRetry(); -assertEquals("expect test offset next available for retry", TEST_OFFSET, next); -assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET)); - -manager.retryStarted(TEST_OFFSET); -expectedWaitTime *= mult; -
[4/7] storm git commit: STORM-2953: Remove storm-kafka
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java -- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java deleted file mode 100644 index 38958b2..000 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java +++ /dev/null @@ -1,288 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ - -package org.apache.storm.kafka; - -import com.google.common.base.Preconditions; -import java.io.IOException; -import java.net.ConnectException; -import java.net.SocketTimeoutException; -import java.nio.ByteBuffer; -import java.nio.channels.UnresolvedAddressException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import kafka.api.FetchRequest; -import kafka.api.FetchRequestBuilder; -import kafka.api.PartitionOffsetRequestInfo; -import kafka.common.TopicAndPartition; -import kafka.javaapi.FetchResponse; -import kafka.javaapi.OffsetRequest; -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.message.Message; -import org.apache.storm.kafka.trident.GlobalPartitionInformation; -import org.apache.storm.kafka.trident.IBrokerReader; -import org.apache.storm.kafka.trident.StaticBrokerReader; -import org.apache.storm.kafka.trident.ZkBrokerReader; -import org.apache.storm.metric.api.IMetric; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class KafkaUtils { - -private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class); -private static final int NO_OFFSET = -5; - -//suppress default constructor for noninstantiablility -private KafkaUtils() { -throw new AssertionError(); -} - -public static IBrokerReader makeBrokerReader(Map topoConf, KafkaConfig conf) { -if (conf.hosts instanceof StaticHosts) { -return new StaticBrokerReader(conf.topic, ((StaticHosts) conf.hosts).getPartitionInformation()); -} else { -return new ZkBrokerReader(topoConf, conf.topic, (ZkHosts) conf.hosts); -} -} - - -public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) { -long startOffsetTime = config.startOffsetTime; -return getOffset(consumer, topic, partition, startOffsetTime); -} - -public static long getOffset(SimpleConsumer consumer, String topic, int partition, long startOffsetTime) { -TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); -Map requestInfo = new HashMap(); -requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1)); -OffsetRequest request = new OffsetRequest( -requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId()); - -long[] offsets = consumer.getOffsetsBefore(request).offsets(topic, partition); -if (offsets.length > 0) { -return offsets[0]; -} else { -return NO_OFFSET; -} -} - -public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) -throws TopicOffsetOutOfRangeException, FailedFetchException, RuntimeException { -ByteBufferMessageSet msgs = null; -String topic = partition.topic; -int partitionId = partition.partition; -FetchRequestBuilder builder = new FetchRequestBuilder(); -FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes). - clientId(config.clientId).maxWait(config.fetchMaxWait).minBytes(config.minFetchByte).build(); -FetchResponse fetchResponse; -try { -fetchResponse = consumer.fetch(fetchRequest); -} catch (Exception e) { -if (e instanceof ConnectException || -e instanceof SocketTimeoutException || -e
[5/7] storm git commit: STORM-2953: Remove storm-kafka
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/README.md -- diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md deleted file mode 100644 index d25bf29..000 --- a/external/storm-kafka/README.md +++ /dev/null @@ -1,382 +0,0 @@ -Storm Kafka - - -Provides core Storm and Trident spout implementations for consuming data from Apache Kafka 0.8.x. - -## Spouts -We support both Trident and core Storm spouts. For both spout implementations, we use a BrokerHost interface that -tracks Kafka broker host to partition mapping and kafkaConfig that controls some Kafka related parameters. - -### BrokerHosts -In order to initialize your Kafka spout/emitter you need to construct an instance of the marker interface BrokerHosts. -Currently, we support the following two implementations: - - ZkHosts -ZkHosts is what you should use if you want to dynamically track Kafka broker to partition mapping. This class uses -Kafka's ZooKeeper entries to track brokerHost -> partition mapping. You can instantiate an object by calling -```java -public ZkHosts(String brokerZkStr, String brokerZkPath) -public ZkHosts(String brokerZkStr) -``` -Where brokerZkStr is just ip:port (e.g. localhost:2181). brokerZkPath is the root directory under which all the topics and -partition information is stored. By default this is /brokers which is what the default Kafka implementation uses. - -By default, the broker-partition mapping is refreshed every 60 seconds from ZooKeeper. If you want to change it, you -should set host.refreshFreqSecs to your chosen value. - - StaticHosts -This is an alternative implementation where broker -> partition information is static. In order to construct an instance -of this class, you need to first construct an instance of GlobalPartitionInformation. - -```java -Broker brokerForPartition0 = new Broker("localhost");//localhost:9092 -Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly -Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string. -GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation(); -partitionInfo.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0 -partitionInfo.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1 -partitionInfo.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2 -StaticHosts hosts = new StaticHosts(partitionInfo); -``` - -### KafkaConfig -The second thing needed for constructing a kafkaSpout is an instance of KafkaConfig. -```java -public KafkaConfig(BrokerHosts hosts, String topic) -public KafkaConfig(BrokerHosts hosts, String topic, String clientId) -``` - -The BrokerHosts can be any implementation of BrokerHosts interface as described above. The topic is name of Kafka topic. -The optional ClientId is used as a part of the ZooKeeper path where the spout's current consumption offset is stored. - -There are 2 extensions of KafkaConfig currently in use. - -Spoutconfig is an extension of KafkaConfig that supports additional fields with ZooKeeper connection info and for controlling -behavior specific to KafkaSpout. The Zkroot will be used as root to store your consumer's offset. The id should uniquely -identify your spout. -```java -public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id); -``` -In addition to these parameters, SpoutConfig contains the following fields that control how KafkaSpout behaves: -```java -// setting for how often to save the current Kafka offset to ZooKeeper -public long stateUpdateIntervalMs = 2000; - -// Retry strategy for failed messages -public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName(); - -// Exponential back-off retry settings. These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt -// calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used. -// Initial delay between successive retries -public long retryInitialDelayMs = 0; -public double retryDelayMultiplier = 1.0; - -// Maximum delay between successive retries -public long retryDelayMaxMs = 60 * 1000; -// Failed message will be retried infinitely if retryLimit is less than zero. -public int retryLimit = -1; - -``` -Core KafkaSpout only accepts an instance of SpoutConfig. - -TridentKafkaConfig is another extension of KafkaConfig. -TridentKafkaEmitter only accepts TridentKafkaConfig. - -The KafkaConfig class also has bunch of public variables that controls your application's behavior. Here are defaults: -```java -public int fetchSizeBytes = 1024 *
[1/2] storm git commit: STORM-2972: Replace storm-kafka with storm-kafka-client in storm-sql-kafka
Repository: storm Updated Branches: refs/heads/master af42f434f -> 3883bf74c STORM-2972: Replace storm-kafka with storm-kafka-client in storm-sql-kafka Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7f992a6b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7f992a6b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7f992a6b Branch: refs/heads/master Commit: 7f992a6b7474267691e48fd2e5a64230cda41390 Parents: af42f43 Author: Stig Rohde Døssing Authored: Sun Jul 15 17:30:41 2018 +0200 Committer: Stig Rohde Døssing Committed: Tue Jul 17 17:59:36 2018 +0200 -- docs/storm-sql-example.md | 31 - docs/storm-sql-internal.md | 10 ++- docs/storm-sql-reference.md | 11 ++-- docs/storm-sql.md | 61 - sql/storm-sql-external/storm-sql-kafka/pom.xml | 7 +- .../sql/kafka/KafkaDataSourcesProvider.java | 69 .../kafka/RecordTranslatorSchemeAdapter.java| 48 ++ .../sql/kafka/TestKafkaDataSourcesProvider.java | 22 +-- 8 files changed, 158 insertions(+), 101 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/7f992a6b/docs/storm-sql-example.md -- diff --git a/docs/storm-sql-example.md b/docs/storm-sql-example.md index 63eb0b6..61f255b 100644 --- a/docs/storm-sql-example.md +++ b/docs/storm-sql-example.md @@ -9,7 +9,7 @@ This page is written by "how-to" style so you can follow the step and learn how ## Preparation -This page assumes that Apache Zookeeper, Apache Storm and Apache Kafka is installed locally and running with properly configured. +This page assumes that Apache Zookeeper, Apache Storm and Apache Kafka are installed locally and running properly configured. For convenience, this page assumes that Apache Kafka 0.10.0 is installed via `brew`. We'll use below tools to prepare the JSON data which will be fed to the input data source. @@ -147,8 +147,8 @@ In this example we'll filter error logs from entire logs and store them to anoth The content of script file is here: ``` -CREATE EXTERNAL TABLE APACHE_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS VARCHAR, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_US DOUBLE) LOCATION 'kafka://localhost:2181/brokers?topic=apache-logs' -CREATE EXTERNAL TABLE APACHE_ERROR_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS INT, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_ELAPSED_MS INT) LOCATION 'kafka://localhost:2181/brokers?topic=apache-error-logs' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}' +CREATE EXTERNAL TABLE APACHE_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS VARCHAR, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_US DOUBLE) LOCATION 'kafka://apache-logs?bootstrap-servers=localhost:9092' +CREATE EXTERNAL TABLE APACHE_ERROR_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS INT, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_ELAPSED_MS INT) LOCATION 'kafka://apache-error-logs?bootstrap-servers=localhost:9092' TBLPROPERTIES '{"producer":{"acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer"}}' INSERT INTO APACHE_ERROR_LOGS SELECT ID, REMOTE_IP, REQUEST_URL, REQUEST_METHOD, CAST(STATUS AS INT) AS STATUS_INT, REQUEST_HEADER_USER_AGENT, TIME_RECEIVED_UTC_ISOFORMAT, (TIME_US / 1000) AS TIME_ELAPSED_MS FROM APACHE_LOGS WHERE (CAST(STATUS AS INT) / 100) >= 4 ``` @@ -156,7 +156,7 @@ Save this file to `apache_log_error_filtering.sql`. Let's take a look at the script. -The first statement defines the table `APACHE_LOGS` which represents the input stream. The `LOCATION` clause specifies the ZkHost (`localhost:2181`), the path of the brokers in ZooKeeper (`/brokers`) and the topic (`apache-logs`). +The first statement defines the table `APACHE_LOGS` which represents the input stream. The `LOCATION` clause specifies the Kafka host (`localhost:9092`) and the topic (`apache-logs`). Note that Kafka data source requires primary key to be defined. That's why we put integer id for parsed JSON data. Similarly, the second statement specifies the table `APACHE_ERROR_LOGS` which represents the output stream. The `TBLPROPERTIES` clause specifies the configuration of
[6/7] storm git commit: STORM-2406 [Storm SQL] Change underlying API to Streams API
STORM-2406 [Storm SQL] Change underlying API to Streams API * This will enable us to provide windowed aggregation, join, etc. * Tuple-to-tuple is making more sense than micro-batch in these cases * Tested with several sql cases * Also bump Calcite version to 1.14.0 * Fix checkstyle issues a bit: not doing exhaustively * Update diagrams in storm-sql internal doc * Also add XML type of exported diagrams as well to restore diagram from draw.io Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c69a23cf Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c69a23cf Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c69a23cf Branch: refs/heads/master Commit: c69a23cfe661afd7862201d94b45282e4450b9ee Parents: c9e9a7c Author: Jungtaek Lim Authored: Fri Jul 13 17:06:01 2018 +0900 Committer: Jungtaek Lim Committed: Tue Jul 17 06:51:39 2018 +0900 -- ...ql-internal-example-exported-from-drawio.xml | 2 + docs/images/storm-sql-internal-example.png | Bin 28377 -> 63265 bytes ...l-internal-workflow-exported-from-drawio.xml | 2 + docs/images/storm-sql-internal-workflow.png | Bin 20020 -> 42408 bytes docs/storm-sql-internal.md | 16 +- docs/storm-sql-reference.md | 10 +- docs/storm-sql.md | 16 +- pom.xml | 2 +- sql/README.md | 183 +- sql/storm-sql-core/pom.xml | 24 + .../storm/sql/AbstractStreamsProcessor.java | 67 +++ .../storm/sql/AbstractTridentProcessor.java | 51 -- .../org/apache/storm/sql/StormSqlContext.java | 22 +- .../jvm/org/apache/storm/sql/StormSqlImpl.java | 12 +- .../sql/calcite/ParallelStreamableTable.java| 2 +- .../apache/storm/sql/calcite/ParallelTable.java | 28 + .../storm/sql/calcite/StormStreamableTable.java | 26 + .../apache/storm/sql/calcite/StormTable.java| 26 + .../apache/storm/sql/compiler/CompilerUtil.java | 85 ++- .../sql/compiler/RexNodeToJavaCodeCompiler.java | 22 +- .../storm/sql/javac/CompilingClassLoader.java | 26 +- .../storm/sql/parser/ColumnConstraint.java | 2 +- .../apache/storm/sql/parser/SqlCreateTable.java | 14 +- .../apache/storm/sql/parser/SqlDDLKeywords.java | 22 - .../apache/storm/sql/parser/SqlDdlKeywords.java | 22 + .../apache/storm/sql/parser/StormParser.java| 3 + .../apache/storm/sql/planner/StormRelUtils.java | 12 +- .../sql/planner/rel/StormStreamScanRelBase.java | 2 - .../storm/sql/planner/streams/QueryPlanner.java | 158 ++ .../sql/planner/streams/StreamsPlanCreator.java | 126 + .../planner/streams/StreamsStormRuleSets.java | 110 .../sql/planner/streams/rel/StreamsCalcRel.java | 96 .../planner/streams/rel/StreamsFilterRel.java | 66 +++ .../streams/rel/StreamsLogicalConvention.java | 69 +++ .../planner/streams/rel/StreamsProjectRel.java | 69 +++ .../sql/planner/streams/rel/StreamsRel.java | 27 + .../streams/rel/StreamsStreamInsertRel.java | 81 +++ .../streams/rel/StreamsStreamScanRel.java | 58 ++ .../streams/rules/StreamsAggregateRule.java | 40 ++ .../planner/streams/rules/StreamsCalcRule.java | 46 ++ .../streams/rules/StreamsFilterRule.java| 47 ++ .../planner/streams/rules/StreamsJoinRule.java | 39 ++ .../streams/rules/StreamsModifyRule.java| 89 +++ .../streams/rules/StreamsProjectRule.java | 48 ++ .../planner/streams/rules/StreamsScanRule.java | 61 ++ .../storm/sql/planner/trident/QueryPlanner.java | 156 -- .../sql/planner/trident/TridentPlanCreator.java | 117 .../planner/trident/TridentStormRuleSets.java | 110 .../sql/planner/trident/rel/TridentCalcRel.java | 92 --- .../planner/trident/rel/TridentFilterRel.java | 59 -- .../trident/rel/TridentLogicalConvention.java | 62 --- .../planner/trident/rel/TridentProjectRel.java | 64 --- .../sql/planner/trident/rel/TridentRel.java | 20 - .../trident/rel/TridentStreamInsertRel.java | 72 --- .../trident/rel/TridentStreamScanRel.java | 49 -- .../trident/rules/TridentAggregateRule.java | 33 -- .../planner/trident/rules/TridentCalcRule.java | 39 -- .../trident/rules/TridentFilterRule.java| 40 -- .../planner/trident/rules/TridentJoinRule.java | 32 -- .../trident/rules/TridentModifyRule.java| 65 --- .../trident/rules/TridentProjectRule.java | 41 -- .../planner/trident/rules/TridentScanRule.java | 53 -- .../test/org/apache/storm/sql/SqlTestUtil.java | 19 +- .../storm/sql/StormSqlLocalClusterImpl.java | 8 +- .../test/org/apache/storm/sql/TestStormSql.java | 84 +-- .../backends/streams/TestCompilerUtils.java | 217 .../backends/streams/TestPlanCompiler.java | 235
[5/7] storm git commit: STORM-2406 [Storm SQL] Change underlying API to Streams API
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/QueryPlanner.java -- diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/QueryPlanner.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/QueryPlanner.java new file mode 100644 index 000..180232e --- /dev/null +++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/QueryPlanner.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.storm.sql.planner.streams; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.calcite.DataContext; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.config.CalciteConnectionConfigImpl; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.sql.util.ChainedSqlOperatorTable; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Planner; +import org.apache.calcite.tools.RelConversionException; +import org.apache.calcite.tools.ValidationException; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.sql.AbstractStreamsProcessor; +import org.apache.storm.sql.javac.CompilingClassLoader; +import org.apache.storm.sql.planner.StormRelDataTypeSystem; +import org.apache.storm.sql.planner.UnsupportedOperatorsVisitor; +import org.apache.storm.sql.planner.streams.rel.StreamsLogicalConvention; +import org.apache.storm.sql.planner.streams.rel.StreamsRel; +import org.apache.storm.sql.runtime.ISqlStreamsDataSource; +import org.apache.storm.streams.Stream; +import org.apache.storm.streams.StreamBuilder; +import org.apache.storm.tuple.Values; + +public class QueryPlanner { + +public static final int STORM_REL_CONVERSION_RULES = 1; + +private final Planner planner; + +private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl( +RelDataTypeSystem.DEFAULT); + +public QueryPlanner(SchemaPlus schema) { +final List traitDefs = new ArrayList(); + +traitDefs.add(ConventionTraitDef.INSTANCE); +traitDefs.add(RelCollationTraitDef.INSTANCE); + +List sqlOperatorTables = new ArrayList<>(); +sqlOperatorTables.add(SqlStdOperatorTable.instance()); +sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema), +Collections.emptyList(), typeFactory, new CalciteConnectionConfigImpl(new Properties(; + +FrameworkConfig config = Frameworks.newConfigBuilder() + .defaultSchema(schema) + .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables)) + .traitDefs(traitDefs) + .context(Contexts.EMPTY_CONTEXT) + .ruleSets(StreamsStormRuleSets.getRuleSets()) + .costFactory(null) + .typeSystem(StormRelDataTypeSystem.STORM_REL_DATATYPE_SYSTEM) + .build(); +this.planner = Frameworks.getPlanner(config); +} + +public AbstractStreamsProcessor
[3/7] storm git commit: STORM-2406 [Storm SQL] Change underlying API to Streams API
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestExpressions.java -- diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestExpressions.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestExpressions.java deleted file mode 100644 index d01aa31..000 --- a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestExpressions.java +++ /dev/null @@ -1,373 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.storm.sql.compiler.backends.trident; - -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.collect.Lists; -import java.math.BigDecimal; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.List; -import javax.annotation.Nullable; -import org.apache.calcite.adapter.java.ReflectiveSchema; -import org.apache.calcite.jdbc.CalciteConnection; -import org.apache.calcite.schema.SchemaPlus; -import org.apache.storm.tuple.Values; -import org.junit.Assert; -import org.junit.Test; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -/** - * Only for testing expression. We're leveraging Avatica to make tests faster. - */ -public class TestExpressions { -@Test -public void testLogicalExpr() throws Exception { -List v = testExpr( -Lists.newArrayList("s.\"id\" > 0 OR s.\"id\" < 1", "s.\"id\" > 0 AND s.\"id\" < 1", - "NOT (s.\"id\" > 0 AND s.\"id\" < 1)")); -assertEquals(new Values(true, false, true), v); -} - -@Test -public void testExpectOperator() throws Exception { -List v = testExpr( -Lists.newArrayList("TRUE IS TRUE", "TRUE IS NOT TRUE", - "UNKNOWN IS TRUE", "UNKNOWN IS NOT TRUE", - "TRUE IS FALSE", "UNKNOWN IS NULL", - "UNKNOWN IS NOT NULL")); -assertEquals(new Values(true, false, false, true, false, true, false), v); -} - -@Test -public void testDistinctBetweenLikeSimilarIn() throws Exception { -List v = testExpr( -Lists.newArrayList("TRUE IS DISTINCT FROM TRUE", - "TRUE IS NOT DISTINCT FROM FALSE", "3 BETWEEN 1 AND 5", - "10 NOT BETWEEN 1 AND 5", "'hello' LIKE '_e%'", - "'world' NOT LIKE 'wor%'", "'abc' SIMILAR TO '[a-zA-Z]+[cd]{1}'", - "'abe' NOT SIMILAR TO '[a-zA-Z]+[cd]{1}'", "'3' IN ('1', '2', '3', '4')", - "2 NOT IN (1, 3, 5)")); -assertEquals(new Values(false, false, true, true, true, -false, true, true, true, true), v); -} - -@Test -public void testCaseStatement() throws Exception { -List v = testExpr( -Lists.newArrayList( -"CASE WHEN 'abcd' IN ('a', 'abc', 'abcde') THEN UPPER('a') " + -"WHEN UPPER('abcd') = 'AB' THEN 'b' ELSE {fn CONCAT('abcd', '#')} END", -"CASE WHEN 'ab' IN ('a', 'abc', 'abcde') THEN UPPER('a') " + -"WHEN UPPER('ab') = 'AB' THEN 'b' ELSE {fn CONCAT('ab', '#')} END", -"CASE WHEN 'abc' IN ('a', 'abc', 'abcde') THEN UPPER('a') " + -"WHEN UPPER('abc') = 'AB' THEN 'b' ELSE {fn CONCAT('abc', '#')} END" -) -); - -// TODO: The data type of literal Calcite assigns seems to be out of expectation. Please see below logical plan. -// LogicalProject(EXPR$0=[CASE(OR(=('abcd', 'a'), =('abcd', 'abc'), =('abcd', 'abcde')), CAST(UPPER('a')):VARCHAR(5) CHARACTER -// SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL, =(UPPER('abcd'), CAST('AB'):CHAR(4) CHARACTER SET "ISO-8859-1" -// COLLATE "ISO-8859-1$en_US$primary"
[7/7] storm git commit: Merge branch 'STORM-2406'
Merge branch 'STORM-2406' Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/af42f434 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/af42f434 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/af42f434 Branch: refs/heads/master Commit: af42f434f4a4c3d9087c6058b359033736d3b5e8 Parents: c9e9a7c c69a23c Author: Jungtaek Lim Authored: Tue Jul 17 07:00:29 2018 +0900 Committer: Jungtaek Lim Committed: Tue Jul 17 07:00:29 2018 +0900 -- ...ql-internal-example-exported-from-drawio.xml | 2 + docs/images/storm-sql-internal-example.png | Bin 28377 -> 63265 bytes ...l-internal-workflow-exported-from-drawio.xml | 2 + docs/images/storm-sql-internal-workflow.png | Bin 20020 -> 42408 bytes docs/storm-sql-internal.md | 16 +- docs/storm-sql-reference.md | 10 +- docs/storm-sql.md | 16 +- pom.xml | 2 +- sql/README.md | 183 +- sql/storm-sql-core/pom.xml | 24 + .../storm/sql/AbstractStreamsProcessor.java | 67 +++ .../storm/sql/AbstractTridentProcessor.java | 51 -- .../org/apache/storm/sql/StormSqlContext.java | 22 +- .../jvm/org/apache/storm/sql/StormSqlImpl.java | 12 +- .../sql/calcite/ParallelStreamableTable.java| 2 +- .../apache/storm/sql/calcite/ParallelTable.java | 28 + .../storm/sql/calcite/StormStreamableTable.java | 26 + .../apache/storm/sql/calcite/StormTable.java| 26 + .../apache/storm/sql/compiler/CompilerUtil.java | 85 ++- .../sql/compiler/RexNodeToJavaCodeCompiler.java | 22 +- .../storm/sql/javac/CompilingClassLoader.java | 26 +- .../storm/sql/parser/ColumnConstraint.java | 2 +- .../apache/storm/sql/parser/SqlCreateTable.java | 14 +- .../apache/storm/sql/parser/SqlDDLKeywords.java | 22 - .../apache/storm/sql/parser/SqlDdlKeywords.java | 22 + .../apache/storm/sql/parser/StormParser.java| 3 + .../apache/storm/sql/planner/StormRelUtils.java | 12 +- .../sql/planner/rel/StormStreamScanRelBase.java | 2 - .../storm/sql/planner/streams/QueryPlanner.java | 158 ++ .../sql/planner/streams/StreamsPlanCreator.java | 126 + .../planner/streams/StreamsStormRuleSets.java | 110 .../sql/planner/streams/rel/StreamsCalcRel.java | 96 .../planner/streams/rel/StreamsFilterRel.java | 66 +++ .../streams/rel/StreamsLogicalConvention.java | 69 +++ .../planner/streams/rel/StreamsProjectRel.java | 69 +++ .../sql/planner/streams/rel/StreamsRel.java | 27 + .../streams/rel/StreamsStreamInsertRel.java | 81 +++ .../streams/rel/StreamsStreamScanRel.java | 58 ++ .../streams/rules/StreamsAggregateRule.java | 40 ++ .../planner/streams/rules/StreamsCalcRule.java | 46 ++ .../streams/rules/StreamsFilterRule.java| 47 ++ .../planner/streams/rules/StreamsJoinRule.java | 39 ++ .../streams/rules/StreamsModifyRule.java| 89 +++ .../streams/rules/StreamsProjectRule.java | 48 ++ .../planner/streams/rules/StreamsScanRule.java | 61 ++ .../storm/sql/planner/trident/QueryPlanner.java | 156 -- .../sql/planner/trident/TridentPlanCreator.java | 117 .../planner/trident/TridentStormRuleSets.java | 110 .../sql/planner/trident/rel/TridentCalcRel.java | 92 --- .../planner/trident/rel/TridentFilterRel.java | 59 -- .../trident/rel/TridentLogicalConvention.java | 62 --- .../planner/trident/rel/TridentProjectRel.java | 64 --- .../sql/planner/trident/rel/TridentRel.java | 20 - .../trident/rel/TridentStreamInsertRel.java | 72 --- .../trident/rel/TridentStreamScanRel.java | 49 -- .../trident/rules/TridentAggregateRule.java | 33 -- .../planner/trident/rules/TridentCalcRule.java | 39 -- .../trident/rules/TridentFilterRule.java| 40 -- .../planner/trident/rules/TridentJoinRule.java | 32 -- .../trident/rules/TridentModifyRule.java| 65 --- .../trident/rules/TridentProjectRule.java | 41 -- .../planner/trident/rules/TridentScanRule.java | 53 -- .../test/org/apache/storm/sql/SqlTestUtil.java | 19 +- .../storm/sql/StormSqlLocalClusterImpl.java | 8 +- .../test/org/apache/storm/sql/TestStormSql.java | 84 +-- .../backends/streams/TestCompilerUtils.java | 217 .../backends/streams/TestPlanCompiler.java | 235 .../backends/trident/TestCompilerUtils.java | 207 --- .../backends/trident/TestExpressions.java | 373 - .../backends/trident/TestPlanCompiler.java | 231 .../storm/sql/hdfs/HdfsDataSourcesProvider.java | 75 +-- .../sql/hdfs/TestHdfsDataSourcesProvider.java | 59 +- .../sql/kafka/KafkaDataSourcesProvider.java | 74 ++- .../sql/kafka/TestKafkaDataSourcesProvider.java | 64 +--
[1/7] storm git commit: STORM-2406 [Storm SQL] Change underlying API to Streams API
Repository: storm Updated Branches: refs/heads/master c9e9a7c29 -> af42f434f http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-runtime/src/test/org/apache/storm/sql/runtime/datasource/socket/TestSocketDataSourceProvider.java -- diff --git a/sql/storm-sql-runtime/src/test/org/apache/storm/sql/runtime/datasource/socket/TestSocketDataSourceProvider.java b/sql/storm-sql-runtime/src/test/org/apache/storm/sql/runtime/datasource/socket/TestSocketDataSourceProvider.java deleted file mode 100644 index 6720c4b..000 --- a/sql/storm-sql-runtime/src/test/org/apache/storm/sql/runtime/datasource/socket/TestSocketDataSourceProvider.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.sql.runtime.datasource.socket; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import org.apache.storm.sql.runtime.DataSourcesRegistry; -import org.apache.storm.sql.runtime.FieldInfo; -import org.apache.storm.sql.runtime.ISqlTridentDataSource; -import org.apache.storm.sql.runtime.serde.json.JsonSerializer; -import org.apache.storm.sql.runtime.datasource.socket.trident.SocketState; -import org.apache.storm.sql.runtime.datasource.socket.trident.SocketStateUpdater; -import org.apache.storm.trident.state.StateUpdater; -import org.apache.storm.trident.tuple.TridentTuple; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; - -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class TestSocketDataSourceProvider { -private static final List FIELDS = ImmutableList.of( -new FieldInfo("ID", int.class, true), -new FieldInfo("val", String.class, false)); -private static final List FIELD_NAMES = ImmutableList.of("ID", "val"); -private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES); - -@Test -public void testSocketSink() throws IOException { -ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource( -URI.create("socket://localhost:"), null, null, new Properties(), FIELDS); -Assert.assertNotNull(ds); - -ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer(); - -Assert.assertEquals(SocketState.Factory.class, consumer.getStateFactory().getClass()); -Assert.assertEquals(SocketStateUpdater.class, consumer.getStateUpdater().getClass()); - -// makeState() fails on creating State so we just mock SocketState anyway -SocketState mockState = mock(SocketState.class); -StateUpdater stateUpdater = consumer.getStateUpdater(); - -List tupleList = mockTupleList(); - -stateUpdater.updateState(mockState, tupleList, null); -for (TridentTuple t : tupleList) { -String serializedValue = new String(SERIALIZER.write(t.getValues(), null).array()); -verify(mockState).write(serializedValue + "\n"); -} -} - -private static List mockTupleList() { -List tupleList = new ArrayList<>(); -TridentTuple t0 = mock(TridentTuple.class); -TridentTuple t1 = mock(TridentTuple.class); -when(t0.getValueByField("ID")).thenReturn(1); -when(t0.getValueByField("val")).thenReturn("2"); -doReturn(Lists.newArrayList(1, "2")).when(t0).getValues(); -when(t0.size()).thenReturn(2); - -when(t1.getValueByField("ID")).thenReturn(2); -when(t1.getValueByField("val")).thenReturn("3"); -doReturn(Lists.newArrayList(2, "3")).when(t1).getValues(); -when(t1.size()).thenReturn(2); - -tupleList.add(t0); -tupleList.add(t1); -return tupleList; -} -}
[2/3] storm git commit: STORM-2974: Add transactional spout to storm-kafka-client
STORM-2974: Add transactional spout to storm-kafka-client Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ba526077 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ba526077 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ba526077 Branch: refs/heads/master Commit: ba5260774bbd436e81da14a97c072e399a70a896 Parents: 2d7c7d3 Author: Stig Rohde Døssing Authored: Fri Mar 9 20:57:53 2018 +0100 Committer: Stig Rohde Døssing Committed: Tue Jul 10 11:27:47 2018 +0200 -- bin/storm.py| 4 +- docs/Command-line-client.md | 2 +- docs/Transactional-topologies.md| 6 +- docs/Trident-state.md | 4 +- docs/flux.md| 2 +- .../TridentKafkaClientTopologyNamedTopics.java | 16 +- .../kafka/spout/EmptyKafkaTupleListener.java| 1 - .../apache/storm/kafka/spout/KafkaSpout.java| 90 .../storm/kafka/spout/KafkaSpoutConfig.java | 1 - .../storm/kafka/spout/KafkaTupleListener.java | 1 - .../kafka/spout/internal/ConsumerFactory.java | 28 +++ .../spout/internal/ConsumerFactoryDefault.java | 29 +++ .../spout/internal/KafkaConsumerFactory.java| 28 --- .../internal/KafkaConsumerFactoryDefault.java | 29 --- .../kafka/spout/internal/OffsetManager.java | 2 - .../kafka/spout/metrics/KafkaOffsetMetric.java | 15 +- .../spout/subscription/NamedTopicFilter.java| 4 +- .../spout/subscription/PatternTopicFilter.java | 7 +- .../kafka/spout/subscription/TopicAssigner.java | 5 +- .../kafka/spout/subscription/TopicFilter.java | 4 +- .../trident/KafkaTridentOpaqueSpoutEmitter.java | 68 ++ .../trident/KafkaTridentSpoutBatchMetadata.java | 23 +- .../trident/KafkaTridentSpoutCoordinator.java | 97 .../spout/trident/KafkaTridentSpoutEmitter.java | 219 --- .../spout/trident/KafkaTridentSpoutOpaque.java | 18 +- .../KafkaTridentSpoutOpaqueCoordinator.java | 94 .../trident/KafkaTridentSpoutTransactional.java | 65 ++ .../KafkaTridentTransactionalSpoutEmitter.java | 68 ++ .../trident/internal/OutputFieldsExtractor.java | 41 .../kafka/spout/KafkaSpoutAbstractTest.java | 11 +- .../storm/kafka/spout/KafkaSpoutEmitTest.java | 10 +- .../KafkaSpoutLogCompactionSupportTest.java | 39 ++-- .../kafka/spout/KafkaSpoutReactivationTest.java | 12 +- .../kafka/spout/KafkaSpoutRebalanceTest.java| 32 +-- .../kafka/spout/KafkaSpoutRetryLimitTest.java | 19 +- .../kafka/spout/MaxUncommittedOffsetTest.java | 7 +- .../spout/SingleTopicKafkaUnitSetupHelper.java | 4 +- .../SpoutWithMockedConsumerSetupHelper.java | 4 +- .../KafkaTridentSpoutBatchMetadataTest.java | 10 +- .../trident/KafkaTridentSpoutEmitterTest.java | 147 + .../KafkaTridentSpoutOpaqueCoordinatorTest.java | 9 +- 41 files changed, 833 insertions(+), 442 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/ba526077/bin/storm.py -- diff --git a/bin/storm.py b/bin/storm.py index 23bab73..3767b45 100755 --- a/bin/storm.py +++ b/bin/storm.py @@ -361,7 +361,7 @@ def jar(jarfile, klass, *args): And when you want to ship maven artifacts and its transitive dependencies, you can pass them to --artifacts with comma-separated string. You can also exclude some dependencies like what you're doing in maven pom. Please add exclusion artifacts with '^' separated string after the artifact. -For example, --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12" will load jedis and kafka artifact and all of transitive dependencies but exclude slf4j-log4j12 from kafka. +For example, -artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka-clients:1.0.0^org.slf4j:slf4j-api" will load jedis and kafka-clients artifact and all of transitive dependencies but exclude slf4j-api from kafka. When you need to pull the artifacts from other than Maven Central, you can pass remote repositories to --artifactRepositories option with comma-separated string. Repository format is "^". '^' is taken as separator because URL allows various characters. @@ -373,7 +373,7 @@ def jar(jarfile, klass, *args): --proxyUsername: username of proxy if it requires basic auth --proxyPassword: password of proxy if it requires basic auth -Complete example of options is here: `./bin/storm jar example/storm-starter/storm-starter-topologies-*.jar org.apache.storm.starter.RollingTopWords blobstore-remote2 remote --jars "./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka/storm-kafka-1.1.0.jar"
[1/3] storm git commit: STORM-2974: Add transactional spout to storm-kafka-client
Repository: storm Updated Branches: refs/heads/master efb2e9a33 -> e21110d33 http://git-wip-us.apache.org/repos/asf/storm/blob/ba526077/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java -- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java index 27e75c2..22f21c7 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java @@ -23,6 +23,7 @@ import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrat import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST; +import com.google.common.annotations.VisibleForTesting; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -33,40 +34,35 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.kafka.spout.RecordTranslator; import org.apache.storm.kafka.spout.TopicPartitionComparator; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; +import org.apache.storm.kafka.spout.internal.ConsumerFactory; +import org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault; import org.apache.storm.kafka.spout.subscription.TopicAssigner; import org.apache.storm.task.TopologyContext; import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout; import org.apache.storm.trident.topology.TransactionAttempt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class KafkaTridentSpoutEmitter implements IOpaquePartitionedTridentSpout.Emitter< -List>, -KafkaTridentSpoutTopicPartition, -Map>, -Serializable { +public class KafkaTridentSpoutEmitter implements Serializable { private static final long serialVersionUID = -7343927794834130435L; private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class); // Kafka -private final KafkaConsumer kafkaConsumer; +private final Consumer consumer; private final KafkaSpoutConfig kafkaSpoutConfig; private final TopicAssigner topicAssigner; - + // The first seek offset for each topic partition, i.e. the offset this spout instance started processing at. -private final Map tpToFirstSeekOffset = new HashMap<>(); +private final Map tpToFirstSeekOffset = new HashMap<>(); private final long pollTimeoutMs; private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy; @@ -76,17 +72,19 @@ public class KafkaTridentSpoutEmitter implements IOpaquePartitionedTrident /** * Create a new Kafka spout emitter. + * * @param kafkaSpoutConfig The kafka spout config * @param topologyContext The topology context */ public KafkaTridentSpoutEmitter(KafkaSpoutConfig kafkaSpoutConfig, TopologyContext topologyContext) { -this(kafkaSpoutConfig, topologyContext, new KafkaConsumerFactoryDefault<>(), new TopicAssigner()); +this(kafkaSpoutConfig, topologyContext, new ConsumerFactoryDefault<>(), new TopicAssigner()); } - + +@VisibleForTesting KafkaTridentSpoutEmitter(KafkaSpoutConfig kafkaSpoutConfig, TopologyContext topologyContext, -KafkaConsumerFactory consumerFactory, TopicAssigner topicAssigner) { +ConsumerFactory consumerFactory, TopicAssigner topicAssigner) { this.kafkaSpoutConfig = kafkaSpoutConfig; -this.kafkaConsumer = consumerFactory.createConsumer(kafkaSpoutConfig); +this.consumer = consumerFactory.createConsumer(kafkaSpoutConfig); this.topologyContext = topologyContext; this.translator = kafkaSpoutConfig.getTranslator(); this.topicAssigner = topicAssigner; @@ -95,57 +93,123 @@ public class KafkaTridentSpoutEmitter implements IOpaquePartitionedTrident LOG.debug("Created {}", this.toString()); } -@Override -
[3/3] storm git commit: Merge branch 'STORM-2974' of https://github.com/srdo/storm into STORM-2974-merge
Merge branch 'STORM-2974' of https://github.com/srdo/storm into STORM-2974-merge Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e21110d3 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e21110d3 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e21110d3 Branch: refs/heads/master Commit: e21110d338fe8ca71b904682be35642a00de9e78 Parents: efb2e9a ba52607 Author: Jungtaek Lim Authored: Fri Jul 13 21:25:41 2018 +0900 Committer: Jungtaek Lim Committed: Fri Jul 13 21:25:41 2018 +0900 -- bin/storm.py| 4 +- docs/Command-line-client.md | 2 +- docs/Transactional-topologies.md| 6 +- docs/Trident-state.md | 4 +- docs/flux.md| 2 +- .../TridentKafkaClientTopologyNamedTopics.java | 16 +- .../kafka/spout/EmptyKafkaTupleListener.java| 1 - .../apache/storm/kafka/spout/KafkaSpout.java| 90 .../storm/kafka/spout/KafkaSpoutConfig.java | 1 - .../storm/kafka/spout/KafkaTupleListener.java | 1 - .../kafka/spout/internal/ConsumerFactory.java | 28 +++ .../spout/internal/ConsumerFactoryDefault.java | 29 +++ .../spout/internal/KafkaConsumerFactory.java| 28 --- .../internal/KafkaConsumerFactoryDefault.java | 29 --- .../kafka/spout/internal/OffsetManager.java | 2 - .../kafka/spout/metrics/KafkaOffsetMetric.java | 15 +- .../spout/subscription/NamedTopicFilter.java| 4 +- .../spout/subscription/PatternTopicFilter.java | 7 +- .../kafka/spout/subscription/TopicAssigner.java | 5 +- .../kafka/spout/subscription/TopicFilter.java | 4 +- .../trident/KafkaTridentOpaqueSpoutEmitter.java | 68 ++ .../trident/KafkaTridentSpoutBatchMetadata.java | 23 +- .../trident/KafkaTridentSpoutCoordinator.java | 97 .../spout/trident/KafkaTridentSpoutEmitter.java | 219 --- .../spout/trident/KafkaTridentSpoutOpaque.java | 18 +- .../KafkaTridentSpoutOpaqueCoordinator.java | 94 .../trident/KafkaTridentSpoutTransactional.java | 65 ++ .../KafkaTridentTransactionalSpoutEmitter.java | 68 ++ .../trident/internal/OutputFieldsExtractor.java | 41 .../kafka/spout/KafkaSpoutAbstractTest.java | 11 +- .../storm/kafka/spout/KafkaSpoutEmitTest.java | 10 +- .../KafkaSpoutLogCompactionSupportTest.java | 39 ++-- .../kafka/spout/KafkaSpoutReactivationTest.java | 12 +- .../kafka/spout/KafkaSpoutRebalanceTest.java| 32 +-- .../kafka/spout/KafkaSpoutRetryLimitTest.java | 19 +- .../kafka/spout/MaxUncommittedOffsetTest.java | 7 +- .../spout/SingleTopicKafkaUnitSetupHelper.java | 4 +- .../SpoutWithMockedConsumerSetupHelper.java | 4 +- .../KafkaTridentSpoutBatchMetadataTest.java | 10 +- .../trident/KafkaTridentSpoutEmitterTest.java | 147 + .../KafkaTridentSpoutOpaqueCoordinatorTest.java | 9 +- 41 files changed, 833 insertions(+), 442 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/e21110d3/bin/storm.py -- http://git-wip-us.apache.org/repos/asf/storm/blob/e21110d3/docs/Command-line-client.md --
[1/2] storm git commit: STORM-3013: Keep KafkaConsumer open when storm-kafka-client spout is deactivated, in order to keep metrics working
Repository: storm Updated Branches: refs/heads/1.x-branch d5ead6681 -> 3b5f9e7c7 STORM-3013: Keep KafkaConsumer open when storm-kafka-client spout is deactivated, in order to keep metrics working Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b7e19c7b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b7e19c7b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b7e19c7b Branch: refs/heads/1.x-branch Commit: b7e19c7b238da2ab4e89c09db2a60f61ba3ccfe6 Parents: 8a16fec Author: Stig Rohde Døssing Authored: Fri Mar 30 19:12:01 2018 +0200 Committer: Stig Rohde Døssing Committed: Tue Jul 10 10:50:57 2018 +0200 -- .../apache/storm/kafka/spout/KafkaSpout.java| 43 +++- .../kafka/spout/metrics/KafkaOffsetMetric.java | 6 +-- .../kafka/spout/KafkaSpoutAbstractTest.java | 6 ++- .../kafka/spout/KafkaSpoutReactivationTest.java | 54 ++-- .../kafka/spout/KafkaSpoutSingleTopicTest.java | 2 +- ...outTopologyDeployActivateDeactivateTest.java | 2 - 6 files changed, 66 insertions(+), 47 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/b7e19c7b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java -- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 76c546e..805bbd2 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -24,7 +24,6 @@ import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrat import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST; import com.google.common.annotations.VisibleForTesting; - import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -103,7 +102,7 @@ public class KafkaSpout extends BaseRichSpout { private transient Timer refreshSubscriptionTimer; private transient TopologyContext context; private transient CommitMetadataManager commitMetadataManager; -private transient KafkaOffsetMetric kafkaOffsetMetric; +private transient KafkaOffsetMetric kafkaOffsetMetric; public KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig) { this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault()); @@ -141,6 +140,8 @@ public class KafkaSpout extends BaseRichSpout { waitingToEmit = new HashMap<>(); commitMetadataManager = new CommitMetadataManager(context, kafkaSpoutConfig.getProcessingGuarantee()); +kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig); + tupleListener.open(conf, context); if (canRegisterMetrics()) { registerMetric(); @@ -192,7 +193,7 @@ public class KafkaSpout extends BaseRichSpout { kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); if (isAtLeastOnceProcessing()) { -commitOffsetsForAckedTuples(new HashSet<>(partitions)); +commitOffsetsForAckedTuples(); } } @@ -287,7 +288,7 @@ public class KafkaSpout extends BaseRichSpout { if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) { if (isAtLeastOnceProcessing()) { -commitOffsetsForAckedTuples(kafkaConsumer.assignment()); +commitOffsetsForAckedTuples(); } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) { Map offsetsToCommit = createFetchedOffsetsMetadata(kafkaConsumer.assignment()); @@ -517,17 +518,9 @@ public class KafkaSpout extends BaseRichSpout { return offsetsToCommit; } -private void commitOffsetsForAckedTuples(Set assignedPartitions) { -// Find offsets that are ready to be committed for every assigned topic partition -final Map assignedOffsetManagers = new HashMap<>(); -for (Entry entry : offsetManagers.entrySet()) { -if (assignedPartitions.contains(entry.getKey())) { -assignedOffsetManagers.put(entry.getKey(), entry.getValue()); -} -} - +private void commitOffsetsForAckedTuples() { final Map nextCommitOffsets = new HashMap<>(); -for (Map.Entry tpOffset : assignedOffsetManagers.entrySet()) { +for (Map.Entry tpOffset : offsetManagers.entrySet()) { final OffsetAndMetadata nextCommitOffset =
[2/2] storm git commit: Merge branch 'STORM-3013-1.x' of https://github.com/srdo/storm into STORM-3013-1.x-merge
Merge branch 'STORM-3013-1.x' of https://github.com/srdo/storm into STORM-3013-1.x-merge Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3b5f9e7c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3b5f9e7c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3b5f9e7c Branch: refs/heads/1.x-branch Commit: 3b5f9e7c75b378881060f454d39e6bf653e0bcb3 Parents: d5ead66 b7e19c7 Author: Jungtaek Lim Authored: Fri Jul 13 11:15:41 2018 +0900 Committer: Jungtaek Lim Committed: Fri Jul 13 11:15:41 2018 +0900 -- .../apache/storm/kafka/spout/KafkaSpout.java| 43 +++- .../kafka/spout/metrics/KafkaOffsetMetric.java | 6 +-- .../kafka/spout/KafkaSpoutAbstractTest.java | 6 ++- .../kafka/spout/KafkaSpoutReactivationTest.java | 54 ++-- .../kafka/spout/KafkaSpoutSingleTopicTest.java | 2 +- ...outTopologyDeployActivateDeactivateTest.java | 2 - 6 files changed, 66 insertions(+), 47 deletions(-) --