[2/3] storm git commit: Merge branch 'STORM-1629'
Merge branch 'STORM-1629' Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/36ed8ef1 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/36ed8ef1 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/36ed8ef1 Branch: refs/heads/master Commit: 36ed8ef1a968ede5e9c60395c13381ef796e7a6a Parents: 367464a 02f9308 Author: Jungtaek Lim Authored: Sun Mar 20 10:36:58 2016 +0900 Committer: Jungtaek Lim Committed: Sun Mar 20 10:36:58 2016 +0900 -- storm-core/src/clj/org/apache/storm/daemon/supervisor.clj | 10 +++--- 1 file changed, 7 insertions(+), 3 deletions(-) --
[1/3] storm git commit: STORM-1629 Files/move doesn't work properly with non-empty directory in Windows
Repository: storm Updated Branches: refs/heads/master 367464a3d -> a726589d8 STORM-1629 Files/move doesn't work properly with non-empty directory in Windows * Use FileUtils/moveDirectory on Windows * It copies whole contents inside directory, and delete directory * Keep using Files/move on non-Windows * it's still better option since doesn't require copying contents inside directory Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/02f9308d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/02f9308d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/02f9308d Branch: refs/heads/master Commit: 02f9308d80da67b6da634b96a08e169268bd9262 Parents: 1a0ed9a Author: Jungtaek Lim Authored: Wed Mar 16 00:06:58 2016 +0900 Committer: Jungtaek Lim Committed: Wed Mar 16 00:06:58 2016 +0900 -- storm-core/src/clj/org/apache/storm/daemon/supervisor.clj | 10 +++--- 1 file changed, 7 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/02f9308d/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj -- diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj index fd8f6c9..498e10a 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj @@ -1060,9 +1060,13 @@ (if (download-blobs-for-topology-succeed? (ConfigUtils/supervisorStormConfPath tmproot) tmproot) (do (log-message "Successfully downloaded blob resources for storm-id " storm-id) -(FileUtils/forceMkdir (File. stormroot)) -(Files/move (.toPath (File. tmproot)) (.toPath (File. stormroot)) - (doto (make-array StandardCopyOption 1) (aset 0 StandardCopyOption/ATOMIC_MOVE))) +(if (Utils/isOnWindows) + ; Files/move with non-empty directory doesn't work well on Windows + (FileUtils/moveDirectory (File. tmproot) (File. stormroot)) + (do +(FileUtils/forceMkdir (File. stormroot)) +(Files/move (.toPath (File. tmproot)) (.toPath (File. stormroot)) +(doto (make-array StandardCopyOption 1) (aset 0 StandardCopyOption/ATOMIC_MOVE) (setup-storm-code-dir conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id)) stormroot)) (do (log-message "Failed to download blob resources for storm-id " storm-id)
[3/3] storm git commit: add STORM-1629 to CHANGELOG.md
add STORM-1629 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a726589d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a726589d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a726589d Branch: refs/heads/master Commit: a726589d8712a3d68eb286c934a25367b3650ee1 Parents: 36ed8ef Author: Jungtaek Lim Authored: Sun Mar 20 10:54:39 2016 +0900 Committer: Jungtaek Lim Committed: Sun Mar 20 10:54:39 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/a726589d/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index 61ad4df..d1f88b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,6 +62,7 @@ * STORM-1521: When using Kerberos login from keytab with multiple bolts/executors ticket is not renewed in hbase bolt. ## 1.0.0 + * STORM-1629: Files/move doesn't work properly with non-empty directory in Windows * STORM-1549: Add support for resetting tuple timeout from bolts via the OutputCollector * STORM-971: Metric for messages lost due to kafka retention * STORM-1483: add storm-mongodb connector
[1/3] storm git commit: STORM-1629 Files/move doesn't work properly with non-empty directory in Windows
Repository: storm Updated Branches: refs/heads/1.x-branch 885aaec43 -> a1a4df7a1 STORM-1629 Files/move doesn't work properly with non-empty directory in Windows * Use FileUtils/moveDirectory on Windows * It copies whole contents inside directory, and delete directory * Keep using Files/move on non-Windows * it's still better option since doesn't require copying contents inside directory Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7b559ef3 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7b559ef3 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7b559ef3 Branch: refs/heads/1.x-branch Commit: 7b559ef3d8e02047a15860d261bc0a40d08834e0 Parents: 80213ba Author: Jungtaek Lim Authored: Wed Mar 16 00:37:28 2016 +0900 Committer: Jungtaek Lim Committed: Wed Mar 16 00:37:28 2016 +0900 -- storm-core/src/clj/org/apache/storm/daemon/supervisor.clj | 10 +++--- 1 file changed, 7 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/7b559ef3/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj -- diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj index 1287d77..b37b9da 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj @@ -959,9 +959,13 @@ (if (download-blobs-for-topology-succeed? (supervisor-stormconf-path tmproot) tmproot) (do (log-message "Successfully downloaded blob resources for storm-id " storm-id) -(FileUtils/forceMkdir (File. stormroot)) -(Files/move (.toPath (File. tmproot)) (.toPath (File. stormroot)) - (doto (make-array StandardCopyOption 1) (aset 0 StandardCopyOption/ATOMIC_MOVE))) +(if on-windows? + ; Files/move with non-empty directory doesn't work well on Windows + (FileUtils/moveDirectory (File. tmproot) (File. stormroot)) + (do +(FileUtils/forceMkdir (File. stormroot)) +(Files/move (.toPath (File. tmproot)) (.toPath (File. stormroot)) +(doto (make-array StandardCopyOption 1) (aset 0 StandardCopyOption/ATOMIC_MOVE) (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot)) (do (log-message "Failed to download blob resources for storm-id " storm-id)
[3/3] storm git commit: add STORM-1629 to CHANGELOG.md
add STORM-1629 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a1a4df7a Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a1a4df7a Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a1a4df7a Branch: refs/heads/1.x-branch Commit: a1a4df7a11dcd148dcdb7ae90852b2db8385e7e8 Parents: dc6608d Author: Jungtaek Lim Authored: Sun Mar 20 11:01:15 2016 +0900 Committer: Jungtaek Lim Committed: Sun Mar 20 11:01:15 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/a1a4df7a/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index 861cafd..1f6f81c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## 1.0.0 + * STORM-1629: Files/move doesn't work properly with non-empty directory in Windows * STORM-1616: Add RAS API for Trident * STORM-1483: add storm-mongodb connector * STORM-1614: backpressure init and cleanup changes
[1/3] storm git commit: STORM-1602 Blobstore UTs are failed on Windows
Repository: storm Updated Branches: refs/heads/1.x-branch a1a4df7a1 -> 2d9f9036f STORM-1602 Blobstore UTs are failed on Windows * ensures objects of InputStream / OutputStream are closed after using * clojure: with-open * java: try-with-resource * skip checking symbolic link in LocalizerTest when on Windows * Windows seems not handle symbolic link in compressed file properly Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8965a6c2 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8965a6c2 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8965a6c2 Branch: refs/heads/1.x-branch Commit: 8965a6c2459841c607fb9587897502581f1e3cdf Parents: eeeb7b9 Author: Jungtaek Lim Authored: Thu Mar 17 15:53:15 2016 +0900 Committer: Jungtaek Lim Committed: Thu Mar 17 15:53:15 2016 +0900 -- .../clj/org/apache/storm/daemon/supervisor.clj | 6 +- .../org/apache/storm/blobstore/BlobStore.java | 5 + .../apache/storm/blobstore/BlobStoreTest.java | 171 +-- .../apache/storm/localizer/LocalizerTest.java | 14 +- 4 files changed, 105 insertions(+), 91 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/8965a6c2/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj -- diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj index 1287d77..dc47988 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj @@ -1168,8 +1168,10 @@ blob-store (Utils/getNimbusBlobStore conf master-code-dir nil)] (try (FileUtils/forceMkdir (File. tmproot)) - (.readBlobTo blob-store (master-stormcode-key storm-id) (FileOutputStream. (supervisor-stormcode-path tmproot)) nil) - (.readBlobTo blob-store (master-stormconf-key storm-id) (FileOutputStream. (supervisor-stormconf-path tmproot)) nil) + (with-open [fos-storm-code (FileOutputStream. (supervisor-stormcode-path tmproot)) + fos-storm-conf (FileOutputStream. (supervisor-stormconf-path tmproot))] +(.readBlobTo blob-store (master-stormcode-key storm-id) fos-storm-code nil) +(.readBlobTo blob-store (master-stormconf-key storm-id) fos-storm-conf nil)) (finally (.shutdown blob-store))) (FileUtils/moveDirectory (File. tmproot) (File. stormroot)) http://git-wip-us.apache.org/repos/asf/storm/blob/8965a6c2/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java -- diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java index 09093a2..14879b4 100644 --- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java +++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java @@ -396,6 +396,11 @@ public abstract class BlobStore implements Shutdownable { public long getFileLength() throws IOException { return part.getFileLength(); } + +@Override +public void close() throws IOException { +in.close(); +} } /** http://git-wip-us.apache.org/repos/asf/storm/blob/8965a6c2/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java -- diff --git a/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java b/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java index 712537a..5f6f50a 100644 --- a/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java +++ b/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java @@ -181,30 +181,30 @@ public class BlobStoreTest { Subject admin = getSubject("admin"); assertStoreHasExactly(store); SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); -AtomicOutputStream out = store.createBlob("test", metadata, admin); -assertStoreHasExactly(store, "test"); -out.write(1); -out.close(); +try (AtomicOutputStream out = store.createBlob("test", metadata, admin)) { + assertStoreHasExactly(store, "test"); + out.write(1); +} store.deleteBlob("test", admin); //Test for Supervisor Admin Subject supervisor = getSubject("supervisor"); assertStoreHasExactly(store); metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); -out = store.createBlob("test", metadata, supervisor); -assertStoreHasExactly(store, "test"); -out.write(1); -out.close(); +try (AtomicOutputStream out = store.createBlob("test", metadata, supervisor)) { + assertStoreHasExactl
[3/3] storm git commit: add STORM-1602 to CHANGELOG.md
add STORM-1602 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2d9f9036 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2d9f9036 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2d9f9036 Branch: refs/heads/1.x-branch Commit: 2d9f9036fba685d7aaae5208e207bedf8d3d8d0d Parents: 247f792 Author: Jungtaek Lim Authored: Sun Mar 20 11:05:28 2016 +0900 Committer: Jungtaek Lim Committed: Sun Mar 20 11:05:28 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/2d9f9036/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f6f81c..4d34c58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## 1.0.0 + * STORM-1602: Blobstore UTs are failed on Windows * STORM-1629: Files/move doesn't work properly with non-empty directory in Windows * STORM-1616: Add RAS API for Trident * STORM-1483: add storm-mongodb connector
[2/3] storm git commit: Merge branch 'STORM-1602-1.x' into 1.x-branch
Merge branch 'STORM-1602-1.x' into 1.x-branch Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/247f7927 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/247f7927 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/247f7927 Branch: refs/heads/1.x-branch Commit: 247f79275341d5d8bb9f2c7d8ebb8457066eedbf Parents: a1a4df7 8965a6c Author: Jungtaek Lim Authored: Sun Mar 20 11:05:06 2016 +0900 Committer: Jungtaek Lim Committed: Sun Mar 20 11:05:06 2016 +0900 -- .../clj/org/apache/storm/daemon/supervisor.clj | 6 +- .../org/apache/storm/blobstore/BlobStore.java | 5 + .../apache/storm/blobstore/BlobStoreTest.java | 171 +-- .../apache/storm/localizer/LocalizerTest.java | 14 +- 4 files changed, 105 insertions(+), 91 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/247f7927/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj --
[1/3] storm git commit: STORM-1602 Blobstore UTs are failed on Windows
Repository: storm Updated Branches: refs/heads/master a726589d8 -> ab66003c1 STORM-1602 Blobstore UTs are failed on Windows * ensures objects of InputStream / OutputStream are closed after using * clojure: with-open * java: try-with-resource * skip checking symbolic link in LocalizerTest when on Windows * Windows seems not handle symbolic link in compressed file properly Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8dd66bfb Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8dd66bfb Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8dd66bfb Branch: refs/heads/master Commit: 8dd66bfb378b7b103694b7d968ad21483f3a3b80 Parents: 50701df Author: Jungtaek Lim Authored: Thu Mar 17 15:53:15 2016 +0900 Committer: Jungtaek Lim Committed: Thu Mar 17 17:28:24 2016 +0900 -- .../clj/org/apache/storm/daemon/supervisor.clj | 6 +- .../org/apache/storm/blobstore/BlobStore.java | 5 + .../apache/storm/blobstore/BlobStoreTest.java | 171 +-- .../apache/storm/localizer/LocalizerTest.java | 7 +- 4 files changed, 98 insertions(+), 91 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/8dd66bfb/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj -- diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj index fd8f6c9..695e7eb 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj @@ -1292,8 +1292,10 @@ blob-store (Utils/getNimbusBlobStore conf master-code-dir nil)] (try (FileUtils/forceMkdir (File. tmproot)) - (.readBlobTo blob-store (ConfigUtils/masterStormCodeKey storm-id) (FileOutputStream. (ConfigUtils/supervisorStormCodePath tmproot)) nil) - (.readBlobTo blob-store (ConfigUtils/masterStormConfKey storm-id) (FileOutputStream. (ConfigUtils/supervisorStormConfPath tmproot)) nil) + (with-open [fos-storm-code (FileOutputStream. (ConfigUtils/supervisorStormCodePath tmproot)) + fos-storm-conf (FileOutputStream. (ConfigUtils/supervisorStormConfPath tmproot))] +(.readBlobTo blob-store (ConfigUtils/masterStormCodeKey storm-id) fos-storm-code nil) +(.readBlobTo blob-store (ConfigUtils/masterStormConfKey storm-id) fos-storm-conf nil)) (finally (.shutdown blob-store))) (FileUtils/moveDirectory (File. tmproot) (File. stormroot)) http://git-wip-us.apache.org/repos/asf/storm/blob/8dd66bfb/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java -- diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java index 09093a2..14879b4 100644 --- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java +++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java @@ -396,6 +396,11 @@ public abstract class BlobStore implements Shutdownable { public long getFileLength() throws IOException { return part.getFileLength(); } + +@Override +public void close() throws IOException { +in.close(); +} } /** http://git-wip-us.apache.org/repos/asf/storm/blob/8dd66bfb/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java -- diff --git a/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java b/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java index 8445e6a..151b5c6 100644 --- a/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java +++ b/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java @@ -182,30 +182,30 @@ public class BlobStoreTest { Subject admin = getSubject("admin"); assertStoreHasExactly(store); SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); -AtomicOutputStream out = store.createBlob("test", metadata, admin); -assertStoreHasExactly(store, "test"); -out.write(1); -out.close(); +try (AtomicOutputStream out = store.createBlob("test", metadata, admin)) { + assertStoreHasExactly(store, "test"); + out.write(1); +} store.deleteBlob("test", admin); //Test for Supervisor Admin Subject supervisor = getSubject("supervisor"); assertStoreHasExactly(store); metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); -out = store.createBlob("test", metadata, supervisor); -assertStoreHasExactly(store, "test"); -out.write(1); -out.close(); +try (AtomicOutputStream out = store.c
[3/3] storm git commit: add STORM-1602 to CHANGELOG.md
add STORM-1602 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ab66003c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ab66003c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ab66003c Branch: refs/heads/master Commit: ab66003c18fe4f8c0926b3219408b735b2ce2adf Parents: 575203c Author: Jungtaek Lim Authored: Sun Mar 20 11:07:37 2016 +0900 Committer: Jungtaek Lim Committed: Sun Mar 20 11:07:37 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/ab66003c/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index d1f88b0..5f08fe2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,6 +62,7 @@ * STORM-1521: When using Kerberos login from keytab with multiple bolts/executors ticket is not renewed in hbase bolt. ## 1.0.0 + * STORM-1602: Blobstore UTs are failed on Windows * STORM-1629: Files/move doesn't work properly with non-empty directory in Windows * STORM-1549: Add support for resetting tuple timeout from bolts via the OutputCollector * STORM-971: Metric for messages lost due to kafka retention
[2/3] storm git commit: Merge branch 'STORM-1602'
Merge branch 'STORM-1602' Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/575203cf Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/575203cf Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/575203cf Branch: refs/heads/master Commit: 575203cf40a6b194f1f24cf9875f0284e42b999d Parents: a726589 8dd66bf Author: Jungtaek Lim Authored: Sun Mar 20 11:07:13 2016 +0900 Committer: Jungtaek Lim Committed: Sun Mar 20 11:07:13 2016 +0900 -- .../clj/org/apache/storm/daemon/supervisor.clj | 6 +- .../org/apache/storm/blobstore/BlobStore.java | 5 + .../apache/storm/blobstore/BlobStoreTest.java | 171 +-- .../apache/storm/localizer/LocalizerTest.java | 7 +- 4 files changed, 98 insertions(+), 91 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/575203cf/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj --
[2/3] storm git commit: Merge branch 'STORM-1629-1.x' into 1.x-branch
Merge branch 'STORM-1629-1.x' into 1.x-branch Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dc6608db Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dc6608db Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dc6608db Branch: refs/heads/1.x-branch Commit: dc6608dbc781f292909d17a739e1b9a8f0acda6d Parents: 885aaec 7b559ef Author: Jungtaek Lim Authored: Sun Mar 20 10:56:36 2016 +0900 Committer: Jungtaek Lim Committed: Sun Mar 20 10:56:36 2016 +0900 -- storm-core/src/clj/org/apache/storm/daemon/supervisor.clj | 10 +++--- 1 file changed, 7 insertions(+), 3 deletions(-) --
[3/3] storm git commit: add STORM-1655 to CHANGELOG.md
add STORM-1655 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/915ce384 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/915ce384 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/915ce384 Branch: refs/heads/master Commit: 915ce3848d11328005337e16f1c2f47a6773c1d9 Parents: 2e10566 Author: Jungtaek Lim Authored: Fri Mar 25 18:08:43 2016 +0900 Committer: Jungtaek Lim Committed: Fri Mar 25 18:08:43 2016 +0900 -- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/915ce384/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index 4161389..b47e6a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -65,8 +65,9 @@ * STORM-1521: When using Kerberos login from keytab with multiple bolts/executors ticket is not renewed in hbase bolt. ## 1.0.0 + * STORM-1655: Flux doesn't set return code to non-zero when there's any exception while deploying topology to remote cluster * STORM-1537: Upgrade to kryo3 in master - * STORM-1654 HBaseBolt creates tick tuples with no interval when we don't set flushIntervalSecs + * STORM-1654: HBaseBolt creates tick tuples with no interval when we don't set flushIntervalSecs * STORM-1625: Move storm-sql dependencies out of lib folder * STORM-1556: nimbus.clj/wait-for-desired-code-replication wrong reset for current-replication-count-jar in local mode * STORM-1636: Supervisor shutdown with worker id pass in being nil
[1/3] storm git commit: STORM-1655 Flux doesn't set return code to non-zero when there's any exception while deploying topology to remote cluster
Repository: storm Updated Branches: refs/heads/master 4264bfc2a -> 915ce3848 STORM-1655 Flux doesn't set return code to non-zero when there's any exception while deploying topology to remote cluster * Modify Flux.runCli() to not catching exceptions so that exception can be propagated to main() Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ba238588 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ba238588 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ba238588 Branch: refs/heads/master Commit: ba2385882e705ebf5c33f5a03b67dc3b14781460 Parents: df3867f Author: Jungtaek Lim Authored: Thu Mar 24 16:45:24 2016 +0900 Committer: Jungtaek Lim Committed: Thu Mar 24 16:45:24 2016 +0900 -- .../main/java/org/apache/storm/flux/Flux.java | 22 1 file changed, 9 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/ba238588/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java -- diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java index cdebd01..5848d2f 100644 --- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java +++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java @@ -159,20 +159,16 @@ public class Flux { if(!cmd.hasOption(OPTION_DRY_RUN)) { if (cmd.hasOption(OPTION_REMOTE)) { LOG.info("Running remotely..."); -try { -// should the topology be active or inactive -SubmitOptions submitOptions = null; -if(cmd.hasOption(OPTION_INACTIVE)){ -LOG.info("Deploying topology in an INACTIVE state..."); -submitOptions = new SubmitOptions(TopologyInitialStatus.INACTIVE); -} else { -LOG.info("Deploying topology in an ACTIVE state..."); -submitOptions = new SubmitOptions(TopologyInitialStatus.ACTIVE); -} -StormSubmitter.submitTopology(topologyName, conf, topology, submitOptions, null); -} catch (Exception e) { -LOG.warn("Unable to deploy topology to remote cluster.", e); +// should the topology be active or inactive +SubmitOptions submitOptions = null; +if(cmd.hasOption(OPTION_INACTIVE)){ +LOG.info("Deploying topology in an INACTIVE state..."); +submitOptions = new SubmitOptions(TopologyInitialStatus.INACTIVE); +} else { +LOG.info("Deploying topology in an ACTIVE state..."); +submitOptions = new SubmitOptions(TopologyInitialStatus.ACTIVE); } +StormSubmitter.submitTopology(topologyName, conf, topology, submitOptions, null); } else { LOG.info("Running in local mode...");
[2/3] storm git commit: Merge branch 'STORM-1655'
Merge branch 'STORM-1655' Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2e105664 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2e105664 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2e105664 Branch: refs/heads/master Commit: 2e1056648add8c4535d780afb44e04201eff3e52 Parents: 4264bfc ba23858 Author: Jungtaek Lim Authored: Fri Mar 25 18:08:05 2016 +0900 Committer: Jungtaek Lim Committed: Fri Mar 25 18:08:05 2016 +0900 -- .../main/java/org/apache/storm/flux/Flux.java | 22 1 file changed, 9 insertions(+), 13 deletions(-) --
[3/3] storm git commit: add STORM-1655 to CHANGELOG.md
add STORM-1655 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a6140de0 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a6140de0 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a6140de0 Branch: refs/heads/1.x-branch Commit: a6140de0db5fa42a55b71425f1cb063d1a715156 Parents: 91bafc6 Author: Jungtaek Lim Authored: Fri Mar 25 18:09:27 2016 +0900 Committer: Jungtaek Lim Committed: Fri Mar 25 18:09:27 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/a6140de0/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index 51c2c9d..cdac9d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## 1.0.0 + * STORM-1655: Flux doesn't set return code to non-zero when there's any exception while deploying topology to remote cluster * STORM-1654: HBaseBolt creates tick tuples with no interval when we don't set flushIntervalSecs * STORM-1625: Move storm-sql dependencies out of lib folder * STORM-1622: Rename classes with older third party shaded packages
[1/3] storm git commit: STORM-1655 Flux doesn't set return code to non-zero when there's any exception while deploying topology to remote cluster
Repository: storm Updated Branches: refs/heads/1.x-branch 3ab224a14 -> a6140de0d STORM-1655 Flux doesn't set return code to non-zero when there's any exception while deploying topology to remote cluster * Modify Flux.runCli() to not catching exceptions so that exception can be propagated to main() Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/723b0873 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/723b0873 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/723b0873 Branch: refs/heads/1.x-branch Commit: 723b08735192895b4a8972c0a8b1a49bb7aae33f Parents: 6163f58 Author: Jungtaek Lim Authored: Thu Mar 24 16:45:24 2016 +0900 Committer: Jungtaek Lim Committed: Thu Mar 24 16:47:13 2016 +0900 -- .../main/java/org/apache/storm/flux/Flux.java | 22 1 file changed, 9 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/723b0873/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java -- diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java index cdebd01..5848d2f 100644 --- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java +++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java @@ -159,20 +159,16 @@ public class Flux { if(!cmd.hasOption(OPTION_DRY_RUN)) { if (cmd.hasOption(OPTION_REMOTE)) { LOG.info("Running remotely..."); -try { -// should the topology be active or inactive -SubmitOptions submitOptions = null; -if(cmd.hasOption(OPTION_INACTIVE)){ -LOG.info("Deploying topology in an INACTIVE state..."); -submitOptions = new SubmitOptions(TopologyInitialStatus.INACTIVE); -} else { -LOG.info("Deploying topology in an ACTIVE state..."); -submitOptions = new SubmitOptions(TopologyInitialStatus.ACTIVE); -} -StormSubmitter.submitTopology(topologyName, conf, topology, submitOptions, null); -} catch (Exception e) { -LOG.warn("Unable to deploy topology to remote cluster.", e); +// should the topology be active or inactive +SubmitOptions submitOptions = null; +if(cmd.hasOption(OPTION_INACTIVE)){ +LOG.info("Deploying topology in an INACTIVE state..."); +submitOptions = new SubmitOptions(TopologyInitialStatus.INACTIVE); +} else { +LOG.info("Deploying topology in an ACTIVE state..."); +submitOptions = new SubmitOptions(TopologyInitialStatus.ACTIVE); } +StormSubmitter.submitTopology(topologyName, conf, topology, submitOptions, null); } else { LOG.info("Running in local mode...");
[2/3] storm git commit: Merge branch 'STORM-1655-1.x' into 1.x-branch
Merge branch 'STORM-1655-1.x' into 1.x-branch Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/91bafc69 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/91bafc69 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/91bafc69 Branch: refs/heads/1.x-branch Commit: 91bafc69d1bb48c246bd5ee5a726f485982e3961 Parents: 3ab224a 723b087 Author: Jungtaek Lim Authored: Fri Mar 25 18:09:10 2016 +0900 Committer: Jungtaek Lim Committed: Fri Mar 25 18:09:10 2016 +0900 -- .../main/java/org/apache/storm/flux/Flux.java | 22 1 file changed, 9 insertions(+), 13 deletions(-) --
[2/3] storm git commit: Merge branch 'STORM-1630-1.x' into 1.x-branch
Merge branch 'STORM-1630-1.x' into 1.x-branch Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ed719c04 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ed719c04 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ed719c04 Branch: refs/heads/1.x-branch Commit: ed719c04880b4f743c7760a48f6b06378427db8b Parents: 52d0a23 38acdad Author: Jungtaek Lim Authored: Sat Mar 26 15:21:18 2016 +0900 Committer: Jungtaek Lim Committed: Sat Mar 26 15:21:18 2016 +0900 -- docs/Documentation.md | 50 docs/index.md | 3 ++- docs/windows-users-guide.md | 21 + 3 files changed, 23 insertions(+), 51 deletions(-) --
[1/3] storm git commit: STORM-1630 "create symbolic link" needs elevation or setting privilege about creating symbolic link
Repository: storm Updated Branches: refs/heads/1.x-branch 52d0a23f4 -> 139a8a3b2 STORM-1630 "create symbolic link" needs elevation or setting privilege about creating symbolic link * Add document to guide how to setup account to be able to 'create symbolic link' on Windows Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/38acdad9 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/38acdad9 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/38acdad9 Branch: refs/heads/1.x-branch Commit: 38acdad9e208a7c3d7cebceafa9776e5e2c32d80 Parents: 732982d Author: Jungtaek Lim Authored: Wed Mar 23 15:20:59 2016 +0900 Committer: Jungtaek Lim Committed: Wed Mar 23 15:20:59 2016 +0900 -- docs/Documentation.md | 50 docs/index.md | 3 ++- docs/windows-users-guide.md | 21 + 3 files changed, 23 insertions(+), 51 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/38acdad9/docs/Documentation.md -- diff --git a/docs/Documentation.md b/docs/Documentation.md deleted file mode 100644 index 8da874c..000 --- a/docs/Documentation.md +++ /dev/null @@ -1,50 +0,0 @@ -layout: documentation -### Basics of Storm - -* [Javadoc](javadocs/index.html) -* [Concepts](Concepts.html) -* [Configuration](Configuration.html) -* [Guaranteeing message processing](Guaranteeing-message-processing.html) -* [Fault-tolerance](Fault-tolerance.html) -* [Command line client](Command-line-client.html) -* [Understanding the parallelism of a Storm topology](Understanding-the-parallelism-of-a-Storm-topology.html) -* [FAQ](FAQ.html) - -### Trident - -Trident is an alternative interface to Storm. It provides exactly-once processing, "transactional" datastore persistence, and a set of common stream analytics operations. - -* [Trident Tutorial](Trident-tutorial.html) -- basic concepts and walkthrough -* [Trident API Overview](Trident-API-Overview.html) -- operations for transforming and orchestrating data -* [Trident State](Trident-state.html)-- exactly-once processing and fast, persistent aggregation -* [Trident spouts](Trident-spouts.html) -- transactional and non-transactional data intake - -### Setup and deploying - -* [Setting up a Storm cluster](Setting-up-a-Storm-cluster.html) -* [Local mode](Local-mode.html) -* [Troubleshooting](Troubleshooting.html) -* [Running topologies on a production cluster](Running-topologies-on-a-production-cluster.html) -* [Building Storm](Maven.html) with Maven - -### Intermediate - -* [Serialization](Serialization.html) -* [Common patterns](Common-patterns.html) -* [Clojure DSL](Clojure-DSL.html) -* [Using non-JVM languages with Storm](Using-non-JVM-languages-with-Storm.html) -* [Distributed RPC](Distributed-RPC.html) -* [Transactional topologies](Transactional-topologies.html) -* [Kestrel and Storm](Kestrel-and-Storm.html) -* [Direct groupings](Direct-groupings.html) -* [Hooks](Hooks.html) -* [Metrics](Metrics.html) -* [Lifecycle of a trident tuple]() - -### Advanced - -* [Defining a non-JVM language DSL for Storm](Defining-a-non-jvm-language-dsl-for-storm.html) -* [Multilang protocol](Multilang-protocol.html) (how to provide support for another language) -* [Implementation docs](Implementation-docs.html) http://git-wip-us.apache.org/repos/asf/storm/blob/38acdad9/docs/index.md -- diff --git a/docs/index.md b/docs/index.md index ff36693..8c6859e 100644 --- a/docs/index.md +++ b/docs/index.md @@ -40,7 +40,8 @@ Trident is an alternative interface to Storm. It provides exactly-once processin * [CGroup Enforcement](cgroups_in_storm.html) * [Pacemaker reduces load on zookeeper for large clusters](Pacemaker.html) * [Resource Aware Scheduler](Resource_Aware_Scheduler_overview.html) -* [Deamon Metrics/Monitoring](storm-metrics-profiling-internal-actions.html) +* [Daemon Metrics/Monitoring](storm-metrics-profiling-internal-actions.html) +* [Windows users guide](windows-users-guide.html) ### Intermediate http://git-wip-us.apache.org/repos/asf/storm/blob/38acdad9/docs/windows-users-guide.md -- diff --git a/docs/windows-users-guide.md b/docs/windows-users-guide.md new file mode 100644 index 000..c3828aa --- /dev/null +++ b/docs/windows-users-guide.md @@ -0,0 +1,21 @@ ++--- + +title: Windows Users Guide + +layout: documentation + +documentation: true + +--- + + + +This page guides how to set up environment on Windows for Apache Storm. + + + +## Symbolic Link + + + +Starting at 1.0.0, Apache Storm utilizes `symbolic link` to aggregate log directory and resource directory
[3/3] storm git commit: add STORM-1630 to CHANGELOG.md
add STORM-1630 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/139a8a3b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/139a8a3b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/139a8a3b Branch: refs/heads/1.x-branch Commit: 139a8a3b2bea78d0d426ef7254a07fea71a493a9 Parents: ed719c0 Author: Jungtaek Lim Authored: Sat Mar 26 15:22:06 2016 +0900 Committer: Jungtaek Lim Committed: Sat Mar 26 15:22:06 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/139a8a3b/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index cdac9d8..06eb7be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## 1.0.0 + * STORM-1630: Add guide page for Windows users * STORM-1655: Flux doesn't set return code to non-zero when there's any exception while deploying topology to remote cluster * STORM-1654: HBaseBolt creates tick tuples with no interval when we don't set flushIntervalSecs * STORM-1625: Move storm-sql dependencies out of lib folder
[1/3] storm git commit: STORM-1630 "create symbolic link" needs elevation or setting privilege about creating symbolic link
Repository: storm Updated Branches: refs/heads/master d22d8450b -> f118060dc STORM-1630 "create symbolic link" needs elevation or setting privilege about creating symbolic link * Add document to guide how to setup account to be able to 'create symbolic link' on Windows Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/21ed0a37 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/21ed0a37 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/21ed0a37 Branch: refs/heads/master Commit: 21ed0a37916d878632812f4ad7d1d2d663f5bf48 Parents: a91abdb Author: Jungtaek Lim Authored: Wed Mar 23 15:13:42 2016 +0900 Committer: Jungtaek Lim Committed: Wed Mar 23 15:13:42 2016 +0900 -- docs/Documentation.md | 50 docs/index.md | 3 ++- docs/windows-users-guide.md | 21 + 3 files changed, 23 insertions(+), 51 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/21ed0a37/docs/Documentation.md -- diff --git a/docs/Documentation.md b/docs/Documentation.md deleted file mode 100644 index 8da874c..000 --- a/docs/Documentation.md +++ /dev/null @@ -1,50 +0,0 @@ -layout: documentation -### Basics of Storm - -* [Javadoc](javadocs/index.html) -* [Concepts](Concepts.html) -* [Configuration](Configuration.html) -* [Guaranteeing message processing](Guaranteeing-message-processing.html) -* [Fault-tolerance](Fault-tolerance.html) -* [Command line client](Command-line-client.html) -* [Understanding the parallelism of a Storm topology](Understanding-the-parallelism-of-a-Storm-topology.html) -* [FAQ](FAQ.html) - -### Trident - -Trident is an alternative interface to Storm. It provides exactly-once processing, "transactional" datastore persistence, and a set of common stream analytics operations. - -* [Trident Tutorial](Trident-tutorial.html) -- basic concepts and walkthrough -* [Trident API Overview](Trident-API-Overview.html) -- operations for transforming and orchestrating data -* [Trident State](Trident-state.html)-- exactly-once processing and fast, persistent aggregation -* [Trident spouts](Trident-spouts.html) -- transactional and non-transactional data intake - -### Setup and deploying - -* [Setting up a Storm cluster](Setting-up-a-Storm-cluster.html) -* [Local mode](Local-mode.html) -* [Troubleshooting](Troubleshooting.html) -* [Running topologies on a production cluster](Running-topologies-on-a-production-cluster.html) -* [Building Storm](Maven.html) with Maven - -### Intermediate - -* [Serialization](Serialization.html) -* [Common patterns](Common-patterns.html) -* [Clojure DSL](Clojure-DSL.html) -* [Using non-JVM languages with Storm](Using-non-JVM-languages-with-Storm.html) -* [Distributed RPC](Distributed-RPC.html) -* [Transactional topologies](Transactional-topologies.html) -* [Kestrel and Storm](Kestrel-and-Storm.html) -* [Direct groupings](Direct-groupings.html) -* [Hooks](Hooks.html) -* [Metrics](Metrics.html) -* [Lifecycle of a trident tuple]() - -### Advanced - -* [Defining a non-JVM language DSL for Storm](Defining-a-non-jvm-language-dsl-for-storm.html) -* [Multilang protocol](Multilang-protocol.html) (how to provide support for another language) -* [Implementation docs](Implementation-docs.html) http://git-wip-us.apache.org/repos/asf/storm/blob/21ed0a37/docs/index.md -- diff --git a/docs/index.md b/docs/index.md index ff36693..8c6859e 100644 --- a/docs/index.md +++ b/docs/index.md @@ -40,7 +40,8 @@ Trident is an alternative interface to Storm. It provides exactly-once processin * [CGroup Enforcement](cgroups_in_storm.html) * [Pacemaker reduces load on zookeeper for large clusters](Pacemaker.html) * [Resource Aware Scheduler](Resource_Aware_Scheduler_overview.html) -* [Deamon Metrics/Monitoring](storm-metrics-profiling-internal-actions.html) +* [Daemon Metrics/Monitoring](storm-metrics-profiling-internal-actions.html) +* [Windows users guide](windows-users-guide.html) ### Intermediate http://git-wip-us.apache.org/repos/asf/storm/blob/21ed0a37/docs/windows-users-guide.md -- diff --git a/docs/windows-users-guide.md b/docs/windows-users-guide.md new file mode 100644 index 000..75bb0fc --- /dev/null +++ b/docs/windows-users-guide.md @@ -0,0 +1,21 @@ +--- +title: Windows Users Guide +layout: documentation +documentation: true +--- + +This page guides how to set up environment on Windows for Apache Storm. + +## Symbolic Link + +Starting at 1.0.0, Apache Storm utilizes `symbolic link` to aggregate log directory and resource directory into worker directory. +Unfor
[3/3] storm git commit: add STORM-1630 to CHANGELOG.md
add STORM-1630 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f118060d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f118060d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f118060d Branch: refs/heads/master Commit: f118060dceee8be014dddb9311ea8ecf0425bf85 Parents: 8d1991a Author: Jungtaek Lim Authored: Sat Mar 26 15:25:10 2016 +0900 Committer: Jungtaek Lim Committed: Sat Mar 26 15:25:10 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/f118060d/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index b47e6a9..5135309 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -65,6 +65,7 @@ * STORM-1521: When using Kerberos login from keytab with multiple bolts/executors ticket is not renewed in hbase bolt. ## 1.0.0 + * STORM-1630: Add guide page for Windows users * STORM-1655: Flux doesn't set return code to non-zero when there's any exception while deploying topology to remote cluster * STORM-1537: Upgrade to kryo3 in master * STORM-1654: HBaseBolt creates tick tuples with no interval when we don't set flushIntervalSecs
[2/3] storm git commit: Merge branch 'STORM-1630'
Merge branch 'STORM-1630' Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8d1991ad Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8d1991ad Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8d1991ad Branch: refs/heads/master Commit: 8d1991ad7aef9e7ec3d3cb1205ce835e8089ba59 Parents: d22d845 21ed0a3 Author: Jungtaek Lim Authored: Sat Mar 26 15:24:54 2016 +0900 Committer: Jungtaek Lim Committed: Sat Mar 26 15:24:54 2016 +0900 -- docs/Documentation.md | 50 docs/index.md | 3 ++- docs/windows-users-guide.md | 21 + 3 files changed, 23 insertions(+), 51 deletions(-) --
[2/2] storm git commit: Merge branch 'STORM-1659-DOCS' of https://github.com/vesense/storm into STORM-1659
Merge branch 'STORM-1659-DOCS' of https://github.com/vesense/storm into STORM-1659 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a48fae24 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a48fae24 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a48fae24 Branch: refs/heads/master Commit: a48fae243d7bf4d374a6a3de8e9d3e46d9bd3b64 Parents: f118060 0a00c67 Author: Jungtaek Lim Authored: Sun Mar 27 20:24:38 2016 +0900 Committer: Jungtaek Lim Committed: Sun Mar 27 20:24:38 2016 +0900 -- docs/Kestrel-and-Storm.md | 2 +- docs/index.md | 14 +- docs/storm-cassandra.md | 255 ++ docs/storm-elasticsearch.md | 105 +++ docs/storm-mongodb.md | 199 docs/storm-mqtt.md | 379 +++ 6 files changed, 948 insertions(+), 6 deletions(-) --
[1/2] storm git commit: STORM-1659:Add documents for external projects
Repository: storm Updated Branches: refs/heads/master f118060dc -> a48fae243 STORM-1659:Add documents for external projects Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0a00c671 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0a00c671 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0a00c671 Branch: refs/heads/master Commit: 0a00c67170db83da3f5c29dcecf6f185fd066b9b Parents: f118060 Author: Xin Wang Authored: Sun Mar 27 16:04:38 2016 +0800 Committer: Xin Wang Committed: Sun Mar 27 16:10:53 2016 +0800 -- docs/Kestrel-and-Storm.md | 2 +- docs/index.md | 14 +- docs/storm-cassandra.md | 255 ++ docs/storm-elasticsearch.md | 105 +++ docs/storm-mongodb.md | 199 docs/storm-mqtt.md | 379 +++ 6 files changed, 948 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/0a00c671/docs/Kestrel-and-Storm.md -- diff --git a/docs/Kestrel-and-Storm.md b/docs/Kestrel-and-Storm.md index cd584ff..ff48995 100644 --- a/docs/Kestrel-and-Storm.md +++ b/docs/Kestrel-and-Storm.md @@ -3,7 +3,7 @@ title: Storm and Kestrel layout: documentation documentation: true --- -This page explains how to use to Storm to consume items from a Kestrel cluster. +This page explains how to use Storm to consume items from a Kestrel cluster. ## Preliminaries ### Storm http://git-wip-us.apache.org/repos/asf/storm/blob/0a00c671/docs/index.md -- diff --git a/docs/index.md b/docs/index.md index 8c6859e..bfd68db 100644 --- a/docs/index.md +++ b/docs/index.md @@ -64,15 +64,19 @@ Trident is an alternative interface to Storm. It provides exactly-once processin * [Worker Profiling](dynamic-worker-profiling.html) ### Integration With External Systems, and Other Libraries -* [Event Hubs Intergration](storm-eventhubs.html) +* [Apache Kafka Integration](storm-kafka.html) * [Apache HBase Integration](storm-hbase.html) * [Apache HDFS Integration](storm-hdfs.html) * [Apache Hive Integration](storm-hive.html) +* [Apache Solr Integration](storm-solr.html) * [JDBC Integration](storm-jdbc.html) -* [Apache Kafka Integration](storm-kafka.html) -* [REDIS Integration](storm-redis.html) -* [Kestrel and Storm](Kestrel-and-Storm.html) -* [Solr Integration](storm-solr.html) +* [Redis Integration](storm-redis.html) +* [Cassandra Integration](storm-cassandra.html) +* [Event Hubs Intergration](storm-eventhubs.html) +* [Elasticsearch Integration](storm-elasticsearch.html) +* [MQTT Integration](storm-mqtt.html) +* [Mongodb Integration](storm-mongodb.html) +* [Kestrel Integration](Kestrel-and-Storm.html) ### Advanced http://git-wip-us.apache.org/repos/asf/storm/blob/0a00c671/docs/storm-cassandra.md -- diff --git a/docs/storm-cassandra.md b/docs/storm-cassandra.md new file mode 100644 index 000..c674fbc --- /dev/null +++ b/docs/storm-cassandra.md @@ -0,0 +1,255 @@ +--- +title: Storm Cassandra Integration +layout: documentation +documentation: true +--- + +### Bolt API implementation for Apache Cassandra + +This library provides core storm bolt on top of Apache Cassandra. +Provides simple DSL to map storm *Tuple* to Cassandra Query Language *Statement*. + + +### Configuration +The following properties may be passed to storm configuration. + +| **Property name**| **Description** | **Default** | +| -| | | +| **cassandra.keyspace** | - | | +| **cassandra.nodes** | - | {"localhost"} | +| **cassandra.username** | - | - | +| **cassandra.password** | - | - | +| **cassandra.port** | - | 9092 | +| **cassandra.output.consistencyLevel**| - | ONE | +| **cassandra.batch.size.rows**| - | 100 | +| **cassandra.retryPolicy**| - | DefaultRetryPolicy | +| **cassandra.reconnectionPolicy.baseDelayMs** | - | 100 (ms) | +| **cassandra.reconnectionPolicy.maxDelayMs** | - | 6 (ms) | + +### CassandraWriterBolt + +Static import +```java + +import static org.apache.storm.cassandra.DynamicStatementBuilder.* + +``` + + Insert Query Builde
[1/2] storm git commit: STORM-1659:Add documents for external projects
Repository: storm Updated Branches: refs/heads/1.x-branch 139a8a3b2 -> 1783d7ae9 STORM-1659:Add documents for external projects Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ad1d7324 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ad1d7324 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ad1d7324 Branch: refs/heads/1.x-branch Commit: ad1d732459d3e2bb3f67484f625af1fa589fd94f Parents: 139a8a3 Author: Xin Wang Authored: Sun Mar 27 16:04:38 2016 +0800 Committer: Xin Wang Committed: Sun Mar 27 16:42:04 2016 +0800 -- docs/Kestrel-and-Storm.md | 2 +- docs/index.md | 14 +- docs/storm-cassandra.md | 255 ++ docs/storm-elasticsearch.md | 105 +++ docs/storm-mongodb.md | 199 docs/storm-mqtt.md | 379 +++ 6 files changed, 948 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/ad1d7324/docs/Kestrel-and-Storm.md -- diff --git a/docs/Kestrel-and-Storm.md b/docs/Kestrel-and-Storm.md index cd584ff..ff48995 100644 --- a/docs/Kestrel-and-Storm.md +++ b/docs/Kestrel-and-Storm.md @@ -3,7 +3,7 @@ title: Storm and Kestrel layout: documentation documentation: true --- -This page explains how to use to Storm to consume items from a Kestrel cluster. +This page explains how to use Storm to consume items from a Kestrel cluster. ## Preliminaries ### Storm http://git-wip-us.apache.org/repos/asf/storm/blob/ad1d7324/docs/index.md -- diff --git a/docs/index.md b/docs/index.md index 8c6859e..bfd68db 100644 --- a/docs/index.md +++ b/docs/index.md @@ -64,15 +64,19 @@ Trident is an alternative interface to Storm. It provides exactly-once processin * [Worker Profiling](dynamic-worker-profiling.html) ### Integration With External Systems, and Other Libraries -* [Event Hubs Intergration](storm-eventhubs.html) +* [Apache Kafka Integration](storm-kafka.html) * [Apache HBase Integration](storm-hbase.html) * [Apache HDFS Integration](storm-hdfs.html) * [Apache Hive Integration](storm-hive.html) +* [Apache Solr Integration](storm-solr.html) * [JDBC Integration](storm-jdbc.html) -* [Apache Kafka Integration](storm-kafka.html) -* [REDIS Integration](storm-redis.html) -* [Kestrel and Storm](Kestrel-and-Storm.html) -* [Solr Integration](storm-solr.html) +* [Redis Integration](storm-redis.html) +* [Cassandra Integration](storm-cassandra.html) +* [Event Hubs Intergration](storm-eventhubs.html) +* [Elasticsearch Integration](storm-elasticsearch.html) +* [MQTT Integration](storm-mqtt.html) +* [Mongodb Integration](storm-mongodb.html) +* [Kestrel Integration](Kestrel-and-Storm.html) ### Advanced http://git-wip-us.apache.org/repos/asf/storm/blob/ad1d7324/docs/storm-cassandra.md -- diff --git a/docs/storm-cassandra.md b/docs/storm-cassandra.md new file mode 100644 index 000..c674fbc --- /dev/null +++ b/docs/storm-cassandra.md @@ -0,0 +1,255 @@ +--- +title: Storm Cassandra Integration +layout: documentation +documentation: true +--- + +### Bolt API implementation for Apache Cassandra + +This library provides core storm bolt on top of Apache Cassandra. +Provides simple DSL to map storm *Tuple* to Cassandra Query Language *Statement*. + + +### Configuration +The following properties may be passed to storm configuration. + +| **Property name**| **Description** | **Default** | +| -| | | +| **cassandra.keyspace** | - | | +| **cassandra.nodes** | - | {"localhost"} | +| **cassandra.username** | - | - | +| **cassandra.password** | - | - | +| **cassandra.port** | - | 9092 | +| **cassandra.output.consistencyLevel**| - | ONE | +| **cassandra.batch.size.rows**| - | 100 | +| **cassandra.retryPolicy**| - | DefaultRetryPolicy | +| **cassandra.reconnectionPolicy.baseDelayMs** | - | 100 (ms) | +| **cassandra.reconnectionPolicy.maxDelayMs** | - | 6 (ms) | + +### CassandraWriterBolt + +Static import +```java + +import static org.apache.storm.cassandra.DynamicStatementBuilder.* + +``` + + Insert Quer
[2/2] storm git commit: Merge branch 'STORM-1659-1.x' of https://github.com/vesense/storm into STORM-1659-1.x
Merge branch 'STORM-1659-1.x' of https://github.com/vesense/storm into STORM-1659-1.x Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1783d7ae Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1783d7ae Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1783d7ae Branch: refs/heads/1.x-branch Commit: 1783d7ae92bd7c282ac0ca7c4f46e15122e91197 Parents: 139a8a3 ad1d732 Author: Jungtaek Lim Authored: Sun Mar 27 20:28:16 2016 +0900 Committer: Jungtaek Lim Committed: Sun Mar 27 20:28:16 2016 +0900 -- docs/Kestrel-and-Storm.md | 2 +- docs/index.md | 14 +- docs/storm-cassandra.md | 255 ++ docs/storm-elasticsearch.md | 105 +++ docs/storm-mongodb.md | 199 docs/storm-mqtt.md | 379 +++ 6 files changed, 948 insertions(+), 6 deletions(-) --
storm git commit: add STORM-1030 to CHANGELOG.md
Repository: storm Updated Branches: refs/heads/master da7969ea9 -> 4eaaa0bb1 add STORM-1030 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4eaaa0bb Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4eaaa0bb Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4eaaa0bb Branch: refs/heads/master Commit: 4eaaa0bb19261020b67c820bd4aee3802a07e025 Parents: da7969e Author: Jungtaek Lim Authored: Tue Mar 29 16:42:04 2016 +0900 Committer: Jungtaek Lim Committed: Tue Mar 29 16:42:04 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/4eaaa0bb/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index b2f30ff..be17468 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -65,6 +65,7 @@ * STORM-1521: When using Kerberos login from keytab with multiple bolts/executors ticket is not renewed in hbase bolt. ## 1.0.0 + * STORM-1030: Hive Connector Fixes * STORM-676: Storm Trident support for sliding/tumbling windows * STORM-1630: Add guide page for Windows users * STORM-1655: Flux doesn't set return code to non-zero when there's any exception while deploying topology to remote cluster
[2/2] storm git commit: add STORM-1030 to CHANGELOG.md
add STORM-1030 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1cfaf124 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1cfaf124 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1cfaf124 Branch: refs/heads/1.x-branch Commit: 1cfaf12452e130978951b6c7aea6b2c29c340d7a Parents: a276616 Author: Jungtaek Lim Authored: Tue Mar 29 17:00:10 2016 +0900 Committer: Jungtaek Lim Committed: Tue Mar 29 17:00:10 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/1cfaf124/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index ae99355..3c36a3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## 1.0.0 + * STORM-1030: Hive Connector Fixes * STORM-676: Storm Trident support for sliding/tumbling windows * STORM-1630: Add guide page for Windows users * STORM-1655: Flux doesn't set return code to non-zero when there's any exception while deploying topology to remote cluster
[1/2] storm git commit: STORM-1030. Hive Connector Fixes.
Repository: storm Updated Branches: refs/heads/1.x-branch 7edd00a36 -> 1cfaf1245 STORM-1030. Hive Connector Fixes. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a276616b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a276616b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a276616b Branch: refs/heads/1.x-branch Commit: a276616bedbbee28ca318059853ae0e94d06408b Parents: 7edd00a Author: Sriharsha Chintalapani Authored: Mon Nov 9 16:40:34 2015 -0800 Committer: Jungtaek Lim Committed: Tue Mar 29 16:43:09 2016 +0900 -- .../org/apache/storm/hive/bolt/HiveBolt.java| 147 -- .../apache/storm/hive/common/HiveOptions.java | 8 +- .../org/apache/storm/hive/common/HiveUtils.java | 11 +- .../apache/storm/hive/common/HiveWriter.java| 127 - .../apache/storm/hive/trident/HiveState.java| 38 ++-- .../storm/hive/trident/HiveStateFactory.java| 1 + .../apache/storm/hive/trident/HiveUpdater.java | 1 + .../storm/hive/bolt/BucketTestHiveTopology.java | 190 +++ .../apache/storm/hive/bolt/HiveTopology.java| 6 +- .../apache/storm/hive/bolt/TestHiveBolt.java| 11 +- .../storm/hive/common/TestHiveWriter.java | 13 +- .../storm/hive/trident/TridentHiveTopology.java | 2 +- 12 files changed, 415 insertions(+), 140 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/a276616b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java -- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java index 0646dcb..ef06e4b 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java @@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory; import java.util.List; import java.util.ArrayList; import java.util.Map; -import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.Timer; import java.util.TimerTask; import java.util.Map.Entry; @@ -56,14 +56,14 @@ public class HiveBolt extends BaseRichBolt { private ExecutorService callTimeoutPool; private transient Timer heartBeatTimer; private Boolean kerberosEnabled = false; -private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false); +private AtomicBoolean sendHeartBeat = new AtomicBoolean(false); private UserGroupInformation ugi = null; -HashMap allWriters; +private Map allWriters; private List tupleBatch; public HiveBolt(HiveOptions options) { this.options = options; -tupleBatch = new LinkedList<>(); +tupleBatch = new LinkedList(); } @Override @@ -87,10 +87,12 @@ public class HiveBolt extends BaseRichBolt { } } this.collector = collector; -allWriters = new HashMap(); +allWriters = new ConcurrentHashMap(); String timeoutName = "hive-bolt-%d"; this.callTimeoutPool = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat(timeoutName).build()); + +sendHeartBeat.set(true); heartBeatTimer = new Timer(); setupHeartBeatTimer(); @@ -105,44 +107,37 @@ public class HiveBolt extends BaseRichBolt { boolean forceFlush = false; if (TupleUtils.isTick(tuple)) { LOG.debug("TICK received! current batch status [{}/{}]", tupleBatch.size(), options.getBatchSize()); -collector.ack(tuple); forceFlush = true; -} -else { +} else { List partitionVals = options.getMapper().mapPartitions(tuple); HiveEndPoint endPoint = HiveUtils.makeEndPoint(partitionVals, options); HiveWriter writer = getOrCreateWriter(endPoint); -if (timeToSendHeartBeat.compareAndSet(true, false)) { -enableHeartBeatOnAllWriters(); -} writer.write(options.getMapper().mapRecord(tuple)); tupleBatch.add(tuple); if (tupleBatch.size() >= options.getBatchSize()) forceFlush = true; } + if(forceFlush && !tupleBatch.isEmpty()) { flushAllWriters(true); LOG.info("acknowledging tuples after writers flushed "); -for(Tuple t : tupleBatch) +for(Tuple t : tupleBatch) { collector.ack(t); +} tupleBatch.clear();
[3/3] storm git commit: Merge branch 'STORM-1030'
Merge branch 'STORM-1030' Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e2ca82f8 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e2ca82f8 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e2ca82f8 Branch: refs/heads/master Commit: e2ca82f8f55ecdaeac31d3da6c1fc634d9bc4b25 Parents: 4eaaa0b 2eb80a0 Author: Jungtaek Lim Authored: Tue Mar 29 17:01:22 2016 +0900 Committer: Jungtaek Lim Committed: Tue Mar 29 17:01:22 2016 +0900 -- .../org/apache/storm/hive/bolt/HiveBolt.java| 147 -- .../apache/storm/hive/common/HiveOptions.java | 8 +- .../org/apache/storm/hive/common/HiveUtils.java | 11 +- .../apache/storm/hive/common/HiveWriter.java| 127 - .../apache/storm/hive/trident/HiveState.java| 38 ++-- .../storm/hive/trident/HiveStateFactory.java| 1 + .../apache/storm/hive/trident/HiveUpdater.java | 1 + .../storm/hive/bolt/BucketTestHiveTopology.java | 190 +++ .../apache/storm/hive/bolt/HiveTopology.java| 6 +- .../apache/storm/hive/bolt/TestHiveBolt.java| 11 +- .../storm/hive/common/TestHiveWriter.java | 13 +- .../storm/hive/trident/TridentHiveTopology.java | 2 +- 12 files changed, 415 insertions(+), 140 deletions(-) --
[1/3] storm git commit: STORM-1030. Hive Connector Fixes.
Repository: storm Updated Branches: refs/heads/master 4eaaa0bb1 -> e2ca82f8f STORM-1030. Hive Connector Fixes. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e5f0e91f Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e5f0e91f Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e5f0e91f Branch: refs/heads/master Commit: e5f0e91fcf7d377817982efd80f6e03d73dc371b Parents: c2cf3be Author: Sriharsha Chintalapani Authored: Mon Nov 9 16:40:34 2015 -0800 Committer: Sriharsha Chintalapani Committed: Mon Mar 28 13:05:52 2016 -0700 -- .../org/apache/storm/hive/bolt/HiveBolt.java| 147 -- .../apache/storm/hive/common/HiveOptions.java | 8 +- .../org/apache/storm/hive/common/HiveUtils.java | 11 +- .../apache/storm/hive/common/HiveWriter.java| 127 - .../apache/storm/hive/trident/HiveState.java| 38 ++-- .../storm/hive/trident/HiveStateFactory.java| 1 + .../apache/storm/hive/trident/HiveUpdater.java | 1 + .../storm/hive/bolt/BucketTestHiveTopology.java | 190 +++ .../apache/storm/hive/bolt/HiveTopology.java| 6 +- .../apache/storm/hive/bolt/TestHiveBolt.java| 11 +- .../storm/hive/common/TestHiveWriter.java | 13 +- .../storm/hive/trident/TridentHiveTopology.java | 2 +- 12 files changed, 415 insertions(+), 140 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/e5f0e91f/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java -- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java index 0646dcb..ef06e4b 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java @@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory; import java.util.List; import java.util.ArrayList; import java.util.Map; -import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.Timer; import java.util.TimerTask; import java.util.Map.Entry; @@ -56,14 +56,14 @@ public class HiveBolt extends BaseRichBolt { private ExecutorService callTimeoutPool; private transient Timer heartBeatTimer; private Boolean kerberosEnabled = false; -private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false); +private AtomicBoolean sendHeartBeat = new AtomicBoolean(false); private UserGroupInformation ugi = null; -HashMap allWriters; +private Map allWriters; private List tupleBatch; public HiveBolt(HiveOptions options) { this.options = options; -tupleBatch = new LinkedList<>(); +tupleBatch = new LinkedList(); } @Override @@ -87,10 +87,12 @@ public class HiveBolt extends BaseRichBolt { } } this.collector = collector; -allWriters = new HashMap(); +allWriters = new ConcurrentHashMap(); String timeoutName = "hive-bolt-%d"; this.callTimeoutPool = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat(timeoutName).build()); + +sendHeartBeat.set(true); heartBeatTimer = new Timer(); setupHeartBeatTimer(); @@ -105,44 +107,37 @@ public class HiveBolt extends BaseRichBolt { boolean forceFlush = false; if (TupleUtils.isTick(tuple)) { LOG.debug("TICK received! current batch status [{}/{}]", tupleBatch.size(), options.getBatchSize()); -collector.ack(tuple); forceFlush = true; -} -else { +} else { List partitionVals = options.getMapper().mapPartitions(tuple); HiveEndPoint endPoint = HiveUtils.makeEndPoint(partitionVals, options); HiveWriter writer = getOrCreateWriter(endPoint); -if (timeToSendHeartBeat.compareAndSet(true, false)) { -enableHeartBeatOnAllWriters(); -} writer.write(options.getMapper().mapRecord(tuple)); tupleBatch.add(tuple); if (tupleBatch.size() >= options.getBatchSize()) forceFlush = true; } + if(forceFlush && !tupleBatch.isEmpty()) { flushAllWriters(true); LOG.info("acknowledging tuples after writers flushed "); -for(Tuple t : tupleBatch) +for(Tuple t : tupleBatch) { collector.ack(t); +} tupleBatch.clear()
[2/3] storm git commit: Merge branch 'STORM-1030-V1' of https://github.com/harshach/incubator-storm into STORM-1030
Merge branch 'STORM-1030-V1' of https://github.com/harshach/incubator-storm into STORM-1030 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2eb80a0f Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2eb80a0f Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2eb80a0f Branch: refs/heads/master Commit: 2eb80a0fd783a1bec3192802bf8e01d6ff152d74 Parents: da7969e e5f0e91 Author: Jungtaek Lim Authored: Tue Mar 29 16:41:29 2016 +0900 Committer: Jungtaek Lim Committed: Tue Mar 29 16:41:29 2016 +0900 -- .../org/apache/storm/hive/bolt/HiveBolt.java| 147 -- .../apache/storm/hive/common/HiveOptions.java | 8 +- .../org/apache/storm/hive/common/HiveUtils.java | 11 +- .../apache/storm/hive/common/HiveWriter.java| 127 - .../apache/storm/hive/trident/HiveState.java| 38 ++-- .../storm/hive/trident/HiveStateFactory.java| 1 + .../apache/storm/hive/trident/HiveUpdater.java | 1 + .../storm/hive/bolt/BucketTestHiveTopology.java | 190 +++ .../apache/storm/hive/bolt/HiveTopology.java| 6 +- .../apache/storm/hive/bolt/TestHiveBolt.java| 11 +- .../storm/hive/common/TestHiveWriter.java | 13 +- .../storm/hive/trident/TridentHiveTopology.java | 2 +- 12 files changed, 415 insertions(+), 140 deletions(-) --
[3/3] storm git commit: add STORM-1573 to CHANGELOG.md
add STORM-1573 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0e4af514 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0e4af514 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0e4af514 Branch: refs/heads/1.x-branch Commit: 0e4af514d786cd8d8ad5411405de34ed07ea436a Parents: fac10d7 Author: Jungtaek Lim Authored: Thu Mar 31 14:47:16 2016 +0900 Committer: Jungtaek Lim Committed: Thu Mar 31 14:47:16 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/0e4af514/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index c37ef4a..89391a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## 1.0.0 + * STORM-1573: Add batch support for MongoInsertBolt * STORM-1660: remove flux gitignore file and move rules to top level gitignore * STORM-1622: Rename classes with older third party shaded packages * STORM-1537: Upgrade to kryo 3
[1/3] storm git commit: STORM-1573: Add batch support for MongoInsertBolt
Repository: storm Updated Branches: refs/heads/1.x-branch 652d2f6eb -> 0e4af514d STORM-1573: Add batch support for MongoInsertBolt Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bd6f2b06 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bd6f2b06 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bd6f2b06 Branch: refs/heads/1.x-branch Commit: bd6f2b06e81b7aa296615b4541cffba9655bcd86 Parents: 139a8a3 Author: vesense Authored: Thu Mar 17 20:51:51 2016 +0800 Committer: vesense Committed: Thu Mar 31 12:48:50 2016 +0800 -- .../storm/mongodb/bolt/MongoInsertBolt.java | 74 ++-- .../storm/mongodb/bolt/MongoUpdateBolt.java | 3 +- .../storm/mongodb/common/MongoDBClient.java | 20 +++--- .../storm/mongodb/trident/state/MongoState.java | 2 +- 4 files changed, 80 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/bd6f2b06/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java -- diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java index 26cd150..a030a6c 100644 --- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java +++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java @@ -17,11 +17,18 @@ */ package org.apache.storm.mongodb.bolt; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + import org.apache.commons.lang.Validate; import org.apache.storm.mongodb.common.mapper.MongoMapper; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; +import org.apache.storm.utils.TupleUtils; import org.bson.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Basic bolt for writing to MongoDB. @@ -30,30 +37,87 @@ import org.bson.Document; * */ public class MongoInsertBolt extends AbstractMongoBolt { +private static final Logger LOG = LoggerFactory.getLogger(MongoInsertBolt.class); + +private static final int DEFAULT_FLUSH_INTERVAL_SECS = 1; private MongoMapper mapper; +private boolean ordered = true; //default is ordered. + +private int batchSize = 15000; + +private List tupleBatch; + +private int flushIntervalSecs = DEFAULT_FLUSH_INTERVAL_SECS; + public MongoInsertBolt(String url, String collectionName, MongoMapper mapper) { super(url, collectionName); Validate.notNull(mapper, "MongoMapper can not be null"); this.mapper = mapper; + +this.tupleBatch = new LinkedList<>(); } @Override public void execute(Tuple tuple) { +boolean forceFlush = false; try{ -//get document -Document doc = mapper.toDocument(tuple); -mongoClient.insert(doc); -this.collector.ack(tuple); +if (TupleUtils.isTick(tuple)) { +LOG.debug("TICK received! current batch status [{}/{}]", tupleBatch.size(), batchSize); +collector.ack(tuple); +forceFlush = true; +} else { +tupleBatch.add(tuple); +if (tupleBatch.size() >= batchSize) { +forceFlush = true; +} +} + +if(forceFlush && !tupleBatch.isEmpty()) { +List docs = new LinkedList<>(); +for (Tuple t : tupleBatch) { +Document doc = mapper.toDocument(t); +docs.add(doc); +} +mongoClient.insert(docs, ordered); + +for(Tuple t : tupleBatch) { +collector.ack(t); +} +tupleBatch.clear(); +} } catch (Exception e) { this.collector.reportError(e); -this.collector.fail(tuple); +for (Tuple t : tupleBatch) { +collector.fail(t); +} +tupleBatch.clear(); } } +public MongoInsertBolt withBatchSize(int batchSize) { +this.batchSize = batchSize; +return this; +} + +public MongoInsertBolt withOrdered(boolean ordered) { +this.ordered = ordered; +return this; +} + +public MongoInsertBolt withFlushIntervalSecs(int flushIntervalSecs) { +this.flushIntervalSecs = flushIntervalSecs; +return this; +} + +@Override +public Map getComponentConfiguration() { +return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), flushIntervalSe
[2/3] storm git commit: Merge branch 'STORM-1573-1.x' of https://github.com/vesense/storm into STORM-1573-1.x
Merge branch 'STORM-1573-1.x' of https://github.com/vesense/storm into STORM-1573-1.x Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fac10d78 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fac10d78 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fac10d78 Branch: refs/heads/1.x-branch Commit: fac10d78e65053e416f5d85378971a3a4f2dcafe Parents: 652d2f6 bd6f2b0 Author: Jungtaek Lim Authored: Thu Mar 31 14:46:41 2016 +0900 Committer: Jungtaek Lim Committed: Thu Mar 31 14:46:41 2016 +0900 -- .../storm/mongodb/bolt/MongoInsertBolt.java | 74 ++-- .../storm/mongodb/bolt/MongoUpdateBolt.java | 3 +- .../storm/mongodb/common/MongoDBClient.java | 20 +++--- .../storm/mongodb/trident/state/MongoState.java | 2 +- 4 files changed, 80 insertions(+), 19 deletions(-) --
[2/3] storm git commit: Merge branch 'STORM-1573' of https://github.com/vesense/storm into STORM-1573
Merge branch 'STORM-1573' of https://github.com/vesense/storm into STORM-1573 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/68250d2a Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/68250d2a Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/68250d2a Branch: refs/heads/master Commit: 68250d2a22052ecda16864db7fdf982dcafd9048 Parents: 2f1411d 78f1706 Author: Jungtaek Lim Authored: Thu Mar 31 14:23:33 2016 +0900 Committer: Jungtaek Lim Committed: Thu Mar 31 14:23:33 2016 +0900 -- .../storm/mongodb/bolt/MongoInsertBolt.java | 74 ++-- .../storm/mongodb/bolt/MongoUpdateBolt.java | 3 +- .../storm/mongodb/common/MongoDBClient.java | 20 +++--- .../storm/mongodb/trident/state/MongoState.java | 2 +- 4 files changed, 80 insertions(+), 19 deletions(-) --
[3/3] storm git commit: add STORM-1573 to CHANGELOG.md
add STORM-1573 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/98dbdcdb Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/98dbdcdb Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/98dbdcdb Branch: refs/heads/master Commit: 98dbdcdb5cf63e3d92b7b67137f44911673d8d8f Parents: 68250d2 Author: Jungtaek Lim Authored: Thu Mar 31 14:45:44 2016 +0900 Committer: Jungtaek Lim Committed: Thu Mar 31 14:45:44 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/98dbdcdb/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index 7fb2a45..2a54b3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -67,6 +67,7 @@ * STORM-1521: When using Kerberos login from keytab with multiple bolts/executors ticket is not renewed in hbase bolt. ## 1.0.0 + * STORM-1573: Add batch support for MongoInsertBolt * STORM-1660: remove flux gitignore file and move rules to top level gitignore * STORM-1634: Refactoring of Resource Aware Scheduler * STORM-1030: Hive Connector Fixes
[1/3] storm git commit: STORM-1573: Add batch support for MongoInsertBolt
Repository: storm Updated Branches: refs/heads/master 2f1411da3 -> 98dbdcdb5 STORM-1573: Add batch support for MongoInsertBolt Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/78f17061 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/78f17061 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/78f17061 Branch: refs/heads/master Commit: 78f17061f7af82e82b011dcadff0829cad202227 Parents: 50701df Author: vesense Authored: Thu Mar 17 20:51:51 2016 +0800 Committer: vesense Committed: Thu Mar 31 12:45:59 2016 +0800 -- .../storm/mongodb/bolt/MongoInsertBolt.java | 74 ++-- .../storm/mongodb/bolt/MongoUpdateBolt.java | 3 +- .../storm/mongodb/common/MongoDBClient.java | 20 +++--- .../storm/mongodb/trident/state/MongoState.java | 2 +- 4 files changed, 80 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/78f17061/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java -- diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java index 26cd150..a030a6c 100644 --- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java +++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java @@ -17,11 +17,18 @@ */ package org.apache.storm.mongodb.bolt; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + import org.apache.commons.lang.Validate; import org.apache.storm.mongodb.common.mapper.MongoMapper; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; +import org.apache.storm.utils.TupleUtils; import org.bson.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Basic bolt for writing to MongoDB. @@ -30,30 +37,87 @@ import org.bson.Document; * */ public class MongoInsertBolt extends AbstractMongoBolt { +private static final Logger LOG = LoggerFactory.getLogger(MongoInsertBolt.class); + +private static final int DEFAULT_FLUSH_INTERVAL_SECS = 1; private MongoMapper mapper; +private boolean ordered = true; //default is ordered. + +private int batchSize = 15000; + +private List tupleBatch; + +private int flushIntervalSecs = DEFAULT_FLUSH_INTERVAL_SECS; + public MongoInsertBolt(String url, String collectionName, MongoMapper mapper) { super(url, collectionName); Validate.notNull(mapper, "MongoMapper can not be null"); this.mapper = mapper; + +this.tupleBatch = new LinkedList<>(); } @Override public void execute(Tuple tuple) { +boolean forceFlush = false; try{ -//get document -Document doc = mapper.toDocument(tuple); -mongoClient.insert(doc); -this.collector.ack(tuple); +if (TupleUtils.isTick(tuple)) { +LOG.debug("TICK received! current batch status [{}/{}]", tupleBatch.size(), batchSize); +collector.ack(tuple); +forceFlush = true; +} else { +tupleBatch.add(tuple); +if (tupleBatch.size() >= batchSize) { +forceFlush = true; +} +} + +if(forceFlush && !tupleBatch.isEmpty()) { +List docs = new LinkedList<>(); +for (Tuple t : tupleBatch) { +Document doc = mapper.toDocument(t); +docs.add(doc); +} +mongoClient.insert(docs, ordered); + +for(Tuple t : tupleBatch) { +collector.ack(t); +} +tupleBatch.clear(); +} } catch (Exception e) { this.collector.reportError(e); -this.collector.fail(tuple); +for (Tuple t : tupleBatch) { +collector.fail(t); +} +tupleBatch.clear(); } } +public MongoInsertBolt withBatchSize(int batchSize) { +this.batchSize = batchSize; +return this; +} + +public MongoInsertBolt withOrdered(boolean ordered) { +this.ordered = ordered; +return this; +} + +public MongoInsertBolt withFlushIntervalSecs(int flushIntervalSecs) { +this.flushIntervalSecs = flushIntervalSecs; +return this; +} + +@Override +public Map getComponentConfiguration() { +return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), flushIntervalSecs); +
[2/3] storm git commit: STORM-1652 Addressed review comments
STORM-1652 Addressed review comments Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/775b2af7 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/775b2af7 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/775b2af7 Branch: refs/heads/master Commit: 775b2af78154baa01db81619ca32903e9f24aea8 Parents: 60f5dfb Author: Satish Duggana Authored: Tue Mar 29 10:12:50 2016 +0530 Committer: Satish Duggana Committed: Tue Mar 29 10:12:50 2016 +0530 -- docs/Trident-API-Overview.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/775b2af7/docs/Trident-API-Overview.md -- diff --git a/docs/Trident-API-Overview.md b/docs/Trident-API-Overview.md index 5a0ffbc..a303797 100644 --- a/docs/Trident-API-Overview.md +++ b/docs/Trident-API-Overview.md @@ -395,8 +395,8 @@ Example of using `HBaseWindowStoreFactory` for windowing can be seen below. Detailed description of all the above APIs in this section can be found [here](javadocs/org/apache/storm/trident/Stream.html) Example applications -Example applications of these APIs are located at [TridentHBaseWindowingStoreTopology](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java) -and [TridentWindowingInmemoryStoreTopology](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java) +Example applications of these APIs are located at [TridentHBaseWindowingStoreTopology]({{page.git-blob-base}}/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java) +and [TridentWindowingInmemoryStoreTopology]({{page.git-blob-base}}/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java) ### partitionAggregate
[1/3] storm git commit: STORM-1652 Added trident windowing API documentation to Trident API doc.
Repository: storm Updated Branches: refs/heads/master 98dbdcdb5 -> d5b80fcca STORM-1652 Added trident windowing API documentation to Trident API doc. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/60f5dfbc Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/60f5dfbc Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/60f5dfbc Branch: refs/heads/master Commit: 60f5dfbc56d293325472459e16cec537d59192a3 Parents: 31db7dc Author: Satish Duggana Authored: Mon Mar 28 12:48:40 2016 +0530 Committer: Satish Duggana Committed: Mon Mar 28 15:09:25 2016 +0530 -- docs/Trident-API-Overview.md | 100 ++ 1 file changed, 100 insertions(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/60f5dfbc/docs/Trident-API-Overview.md -- diff --git a/docs/Trident-API-Overview.md b/docs/Trident-API-Overview.md index d5893cc..5a0ffbc 100644 --- a/docs/Trident-API-Overview.md +++ b/docs/Trident-API-Overview.md @@ -299,6 +299,106 @@ Below example shows how these APIs can be used to find maximum using respective Example applications of these APIs can be located at [TridentMinMaxOfDevicesTopology](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java) and [TridentMinMaxOfVehiclesTopology](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java) +### Windowing +Trident streams can process tuples in batches which are of the same window and emit aggregated result to the next operation. +There are two kinds of windowing supported which are based on processing time or tuples count: +1. Tumbling window +2. Sliding window + + Tumbling window +Tuples are grouped in a single window based on processing time or count. Any tuple belongs to only one of the windows. + +```java + +/** + * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples. + */ +public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields); + +/** + * Returns a stream of tuples which are aggregated results of a window that tumbles at duration of {@code windowDuration} + */ +public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields); + +``` + + Sliding window +Tuples are grouped in windows and window slides for every sliding interval. A tuple can belong to more than one window. + +```java + +/** + * Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples + * and slides the window after {@code slideCount}. + */ +public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields); + +/** + * Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slidingInterval} + * and completes a window at {@code windowDuration} + */ +public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval, +WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields); +``` + +Examples of tumbling and sliding windows can be found [here](Windowing.html) + + Common windowing API +Below is the common windowing API which takes `WindowConfig` for any supported windowing configurations. + +```java + +public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields, + Aggregator aggregator, Fields functionFields) + +``` + +`windowConfig` can be any of the below. + - `SlidingCountWindow.of(int windowCount, int slidingCount)` + - `SlidingDurationWindow.of(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingDuration)` + - `TumblingCountWindow.of(int windowLength)` + - `TumblingDurationWindow.of(BaseWindowedBolt.Duration windowLength)` + + +Trident windowing APIs need `WindowsStoreFactory` to store received tuples and aggregated values. Currently, basic implementation +for HBase is given with `HBaseWindowsStoreFactory`
[3/3] storm git commit: Merge branch 'STORM-1652' of https://github.com/satishd/storm into STORM-1652
Merge branch 'STORM-1652' of https://github.com/satishd/storm into STORM-1652 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d5b80fcc Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d5b80fcc Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d5b80fcc Branch: refs/heads/master Commit: d5b80fccace4e964d2ac7cce338f416c4efe5a5a Parents: 98dbdcd 775b2af Author: Jungtaek Lim Authored: Thu Mar 31 18:30:19 2016 +0900 Committer: Jungtaek Lim Committed: Thu Mar 31 18:30:19 2016 +0900 -- docs/Trident-API-Overview.md | 100 ++ 1 file changed, 100 insertions(+) --
[1/2] storm git commit: STORM-1652 Added trident windowing API documentation to Trident API doc.
Repository: storm Updated Branches: refs/heads/1.x-branch 0e4af514d -> fe50a1b55 STORM-1652 Added trident windowing API documentation to Trident API doc. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/12c504e7 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/12c504e7 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/12c504e7 Branch: refs/heads/1.x-branch Commit: 12c504e7222183d33f52b1b3cad9a62839f8b405 Parents: 0e4af51 Author: Satish Duggana Authored: Mon Mar 28 12:48:40 2016 +0530 Committer: Jungtaek Lim Committed: Thu Mar 31 19:10:07 2016 +0900 -- docs/Trident-API-Overview.md | 100 ++ 1 file changed, 100 insertions(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/12c504e7/docs/Trident-API-Overview.md -- diff --git a/docs/Trident-API-Overview.md b/docs/Trident-API-Overview.md index d5893cc..5a0ffbc 100644 --- a/docs/Trident-API-Overview.md +++ b/docs/Trident-API-Overview.md @@ -299,6 +299,106 @@ Below example shows how these APIs can be used to find maximum using respective Example applications of these APIs can be located at [TridentMinMaxOfDevicesTopology](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java) and [TridentMinMaxOfVehiclesTopology](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java) +### Windowing +Trident streams can process tuples in batches which are of the same window and emit aggregated result to the next operation. +There are two kinds of windowing supported which are based on processing time or tuples count: +1. Tumbling window +2. Sliding window + + Tumbling window +Tuples are grouped in a single window based on processing time or count. Any tuple belongs to only one of the windows. + +```java + +/** + * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples. + */ +public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields); + +/** + * Returns a stream of tuples which are aggregated results of a window that tumbles at duration of {@code windowDuration} + */ +public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields); + +``` + + Sliding window +Tuples are grouped in windows and window slides for every sliding interval. A tuple can belong to more than one window. + +```java + +/** + * Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples + * and slides the window after {@code slideCount}. + */ +public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields); + +/** + * Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slidingInterval} + * and completes a window at {@code windowDuration} + */ +public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval, +WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields); +``` + +Examples of tumbling and sliding windows can be found [here](Windowing.html) + + Common windowing API +Below is the common windowing API which takes `WindowConfig` for any supported windowing configurations. + +```java + +public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields, + Aggregator aggregator, Fields functionFields) + +``` + +`windowConfig` can be any of the below. + - `SlidingCountWindow.of(int windowCount, int slidingCount)` + - `SlidingDurationWindow.of(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingDuration)` + - `TumblingCountWindow.of(int windowLength)` + - `TumblingDurationWindow.of(BaseWindowedBolt.Duration windowLength)` + + +Trident windowing APIs need `WindowsStoreFactory` to store received tuples and aggregated values. Currently, basic implementation +for HBase is given with `HBaseWindowsStoreFa
[2/2] storm git commit: STORM-1652 Addressed review comments
STORM-1652 Addressed review comments Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fe50a1b5 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fe50a1b5 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fe50a1b5 Branch: refs/heads/1.x-branch Commit: fe50a1b558ae1ccf28854a344062a52998096840 Parents: 12c504e Author: Satish Duggana Authored: Tue Mar 29 10:12:50 2016 +0530 Committer: Jungtaek Lim Committed: Thu Mar 31 19:10:27 2016 +0900 -- docs/Trident-API-Overview.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/fe50a1b5/docs/Trident-API-Overview.md -- diff --git a/docs/Trident-API-Overview.md b/docs/Trident-API-Overview.md index 5a0ffbc..a303797 100644 --- a/docs/Trident-API-Overview.md +++ b/docs/Trident-API-Overview.md @@ -395,8 +395,8 @@ Example of using `HBaseWindowStoreFactory` for windowing can be seen below. Detailed description of all the above APIs in this section can be found [here](javadocs/org/apache/storm/trident/Stream.html) Example applications -Example applications of these APIs are located at [TridentHBaseWindowingStoreTopology](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java) -and [TridentWindowingInmemoryStoreTopology](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java) +Example applications of these APIs are located at [TridentHBaseWindowingStoreTopology]({{page.git-blob-base}}/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java) +and [TridentWindowingInmemoryStoreTopology]({{page.git-blob-base}}/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java) ### partitionAggregate
[2/3] storm git commit: Merge branch 'STORM-1693-master' of https://github.com/abhishekagarwal87/storm into STORM-1693
Merge branch 'STORM-1693-master' of https://github.com/abhishekagarwal87/storm into STORM-1693 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/baaba667 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/baaba667 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/baaba667 Branch: refs/heads/master Commit: baaba667e1ecea3f644974eda72bbfa922b31586 Parents: a41fef3 983c420 Author: Jungtaek Lim Authored: Fri Apr 15 11:16:46 2016 +0900 Committer: Jungtaek Lim Committed: Fri Apr 15 11:16:46 2016 +0900 -- .../clj/org/apache/storm/daemon/executor.clj| 3 ++- .../storm/metric/internal/RateTracker.java | 26 .../apache/storm/stats/BoltExecutorStats.java | 6 ++--- .../apache/storm/stats/SpoutExecutorStats.java | 4 --- .../org/apache/storm/utils/DisruptorQueue.java | 5 5 files changed, 10 insertions(+), 34 deletions(-) --
[1/3] storm git commit: STORM-1693: Move stats cleanup to executor shutdown
Repository: storm Updated Branches: refs/heads/master a41fef386 -> 6cf8a9c99 STORM-1693: Move stats cleanup to executor shutdown Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/983c420b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/983c420b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/983c420b Branch: refs/heads/master Commit: 983c420bfb170fba0b6ae0fd1b1fb3b99d3a1079 Parents: 6415863 Author: Abhishek Agarwal Authored: Tue Apr 12 21:12:09 2016 +0530 Committer: Abhishek Agarwal Committed: Tue Apr 12 21:12:09 2016 +0530 -- .../clj/org/apache/storm/daemon/executor.clj| 3 ++- .../storm/metric/internal/RateTracker.java | 26 .../apache/storm/stats/BoltExecutorStats.java | 6 ++--- .../apache/storm/stats/SpoutExecutorStats.java | 4 --- .../org/apache/storm/utils/DisruptorQueue.java | 5 5 files changed, 10 insertions(+), 34 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/983c420b/storm-core/src/clj/org/apache/storm/daemon/executor.clj -- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index 8a77a61..6f7c18c 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -376,7 +376,8 @@ (doseq [t threads] (.interrupt t) (.join t)) - + +(.cleanupStats (:stats executor-data)) (doseq [user-context (map #(.getUserContext %) (vals task-datas))] (doseq [hook (.getHooks user-context)] (.cleanup hook))) http://git-wip-us.apache.org/repos/asf/storm/blob/983c420b/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java -- diff --git a/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java b/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java index 65ec931..92a8205 100644 --- a/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java +++ b/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java @@ -17,7 +17,6 @@ */ package org.apache.storm.metric.internal; -import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.atomic.AtomicLong; @@ -137,29 +136,4 @@ public class RateTracker{ rotateBuckets(System.currentTimeMillis()); } } - -public static void main (String args[]) throws Exception { -final int number = (args.length >= 1) ? Integer.parseInt(args[0]) : 1; -for (int i = 0; i < 10; i++) { -testRate(number); -} -} - -private static void testRate(int number) { -RateTracker rt = new RateTracker(1, 10); -long start = System.currentTimeMillis(); -for (int i = 0; i < number; i++) { -rt.notify(1); -if ((i % 100) == 0) { -//There is an issue with some JVM versions where an integer for loop that takes a long time -// can starve other threads resulting in the timer thread not getting called. -// This is a work around for that, and we still get the same results. -Thread.yield(); -} -} -long end = System.currentTimeMillis(); -double rate = rt.reportRate(); -rt.close(); -System.out.printf("time %,8d count %,8d rate %,15.2f reported rate %,15.2f\n", end-start,number, ((number * 1000.0)/(end-start)), rate); -} } http://git-wip-us.apache.org/repos/asf/storm/blob/983c420b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java -- diff --git a/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java index e26e56b..bfd0d36 100644 --- a/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java +++ b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java @@ -18,13 +18,15 @@ package org.apache.storm.stats; import com.google.common.collect.Lists; -import java.util.List; + import org.apache.storm.generated.BoltStats; import org.apache.storm.generated.ExecutorSpecificStats; import org.apache.storm.generated.ExecutorStats; import org.apache.storm.metric.internal.MultiCountStatAndMetric; import org.apache.storm.metric.internal.MultiLatencyStatAndMetric; +import java.util.List; + @SuppressWarnings("unchecked") public class BoltExecutorStats extends CommonStats { @@ -83,8 +85,6 @@ public class BoltExecutorStats extends CommonStats { } public
[2/3] storm git commit: Merge branch 'STORM-1693' of https://github.com/abhishekagarwal87/storm into STORM-1693-1.x
Merge branch 'STORM-1693' of https://github.com/abhishekagarwal87/storm into STORM-1693-1.x Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b20c63fb Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b20c63fb Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b20c63fb Branch: refs/heads/1.x-branch Commit: b20c63fb02f8ba5d3b011e013d5a2a5f61ef28fe Parents: ddc3c04 3923e17 Author: Jungtaek Lim Authored: Fri Apr 15 11:10:09 2016 +0900 Committer: Jungtaek Lim Committed: Fri Apr 15 11:10:09 2016 +0900 -- .../clj/org/apache/storm/daemon/executor.clj| 2 +- storm-core/src/clj/org/apache/storm/stats.clj | 20 ++- .../storm/metric/internal/RateTracker.java | 26 .../org/apache/storm/utils/DisruptorQueue.java | 16 +++- 4 files changed, 25 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/b20c63fb/storm-core/src/clj/org/apache/storm/stats.clj --
[3/3] storm git commit: add STORM-1693 to CHANGELOG.md
add STORM-1693 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6cf8a9c9 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6cf8a9c9 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6cf8a9c9 Branch: refs/heads/master Commit: 6cf8a9c99587b4e3541cd93c09811cf938d1a601 Parents: baaba66 Author: Jungtaek Lim Authored: Fri Apr 15 11:32:56 2016 +0900 Committer: Jungtaek Lim Committed: Fri Apr 15 11:32:56 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/6cf8a9c9/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c99c9b..614c1b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -78,6 +78,7 @@ * STORM-1521: When using Kerberos login from keytab with multiple bolts/executors ticket is not renewed in hbase bolt. ## 1.0.1 + * STORM-1693: Move stats cleanup to executor shutdown * STORM-1670: LocalState#get(String) can throw FileNotFoundException which results in not removing worker heartbeats and supervisor is kind of stuck and goes down after some time. * STORM-1677: Test resource files are excluded from source distribution, which makes logviewer-test failing * STORM-1585: Add DDL support for UDFs in storm-sql
[3/3] storm git commit: add STORM-1693 to CHANGELOG.md, also move some issues to 1.0.1
add STORM-1693 to CHANGELOG.md, also move some issues to 1.0.1 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b511a8b4 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b511a8b4 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b511a8b4 Branch: refs/heads/1.x-branch Commit: b511a8b4f55d1ba1afddc3ac6a25ffd4f2aef42e Parents: b20c63f Author: Jungtaek Lim Authored: Fri Apr 15 11:15:54 2016 +0900 Committer: Jungtaek Lim Committed: Fri Apr 15 11:17:54 2016 +0900 -- CHANGELOG.md | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/b511a8b4/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index d81a240..3faa2d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,12 @@ -## 1.0.0 +## 1.0.1 + * STORM-1693: Move stats cleanup to executor shutdown * STORM-1585: Add DDL support for UDFs in storm-sql * STORM-1681: Bug in scheduling cyclic topologies when scheduling with RAS * STORM-1706: Add RELEASE and storm-env.sh to storm-diet assembly * STORM-1613: Upgraded HBase version to 1.1.0 * STORM-1687: divide by zero in stats + +## 1.0.0 * STORM-1670: LocalState#get(String) can throw FileNotFoundException which may result supervisor.clj#sync-processes stop assigning new workers/assignments * STORM-1677: Test resource files are excluded from source distribution, which makes logviewer-test failing * STORM-676: Storm Trident support for sliding/tumbling windows
[1/3] storm git commit: STORM-1693: Move stats cleanup to executor shutdown
Repository: storm Updated Branches: refs/heads/1.x-branch ddc3c0458 -> b511a8b4f STORM-1693: Move stats cleanup to executor shutdown Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3923e17e Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3923e17e Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3923e17e Branch: refs/heads/1.x-branch Commit: 3923e17ec55ce2954ca8c1966dbd017ed6a67fef Parents: a240df5 Author: Abhishek Agarwal Authored: Tue Apr 12 15:14:42 2016 +0530 Committer: Abhishek Agarwal Committed: Tue Apr 12 15:14:42 2016 +0530 -- .../clj/org/apache/storm/daemon/executor.clj| 2 +- storm-core/src/clj/org/apache/storm/stats.clj | 20 ++- .../storm/metric/internal/RateTracker.java | 26 .../org/apache/storm/utils/DisruptorQueue.java | 16 +++- 4 files changed, 25 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/3923e17e/storm-core/src/clj/org/apache/storm/daemon/executor.clj -- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index e974191..9ea4eb4 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -415,7 +415,7 @@ (doseq [t threads] (.interrupt t) (.join t)) - +(stats/cleanup-stats! (:stats executor-data)) (doseq [user-context (map :user-context (vals task-datas))] (doseq [hook (.getHooks user-context)] (.cleanup hook))) http://git-wip-us.apache.org/repos/asf/storm/blob/3923e17e/storm-core/src/clj/org/apache/storm/stats.clj -- diff --git a/storm-core/src/clj/org/apache/storm/stats.clj b/storm-core/src/clj/org/apache/storm/stats.clj index 68b16fd..9f36dbf 100644 --- a/storm-core/src/clj/org/apache/storm/stats.clj +++ b/storm-core/src/clj/org/apache/storm/stats.clj @@ -153,25 +153,25 @@ [^SpoutExecutorStats stats stream latency-ms] (.incBy ^MultiCountStatAndMetric (stats-failed stats) stream (stats-rate stats))) -(defn- cleanup-stat! [stat] +(defn- close-stat! [stat] (.close stat)) (defn- cleanup-common-stats! [^CommonStats stats] (doseq [f COMMON-FIELDS] -(cleanup-stat! (f stats +(close-stat! (f stats (defn cleanup-bolt-stats! [^BoltExecutorStats stats] (cleanup-common-stats! (:common stats)) (doseq [f BOLT-FIELDS] -(cleanup-stat! (f stats +(close-stat! (f stats (defn cleanup-spout-stats! [^SpoutExecutorStats stats] (cleanup-common-stats! (:common stats)) (doseq [f SPOUT-FIELDS] -(cleanup-stat! (f stats +(close-stat! (f stats (defn- value-stats [stats fields] @@ -188,14 +188,12 @@ (defn value-bolt-stats! [^BoltExecutorStats stats] - (cleanup-bolt-stats! stats) (merge (value-common-stats (:common stats)) (value-stats stats BOLT-FIELDS) {:type :bolt})) (defn value-spout-stats! [^SpoutExecutorStats stats] - (cleanup-spout-stats! stats) (merge (value-common-stats (:common stats)) (value-stats stats SPOUT-FIELDS) {:type :spout})) @@ -210,6 +208,16 @@ [stats] (value-bolt-stats! stats)) +(defmulti cleanup-stats! class-selector) + +(defmethod cleanup-stats! SpoutExecutorStats + [stats] + (cleanup-spout-stats! stats)) + +(defmethod cleanup-stats! BoltExecutorStats + [stats] + (cleanup-bolt-stats! stats)) + (defmulti thriftify-specific-stats :type) (defmulti clojurify-specific-stats class-selector) http://git-wip-us.apache.org/repos/asf/storm/blob/3923e17e/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java -- diff --git a/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java b/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java index 65ec931..92a8205 100644 --- a/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java +++ b/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java @@ -17,7 +17,6 @@ */ package org.apache.storm.metric.internal; -import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.atomic.AtomicLong; @@ -137,29 +136,4 @@ public class RateTracker{ rotateBuckets(System.currentTimeMillis()); } } - -public static void main (String args[]) throws Exception { -final int number = (args.length >= 1) ? Integer.parseInt(args[0]) : 1; -for (int i = 0; i < 10; i++) { -testRate(number); -} -} - -
[4/4] storm git commit: add STORM-1696 to CHANGELOG.md
add STORM-1696 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/97e131d5 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/97e131d5 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/97e131d5 Branch: refs/heads/master Commit: 97e131d5a2d4140fec02aa3a05b5554b6fc289f4 Parents: 34d9515 Author: Jungtaek Lim Authored: Fri Apr 15 11:50:40 2016 +0900 Committer: Jungtaek Lim Committed: Fri Apr 15 11:50:40 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/97e131d5/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index 614c1b4..77d35e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -78,6 +78,7 @@ * STORM-1521: When using Kerberos login from keytab with multiple bolts/executors ticket is not renewed in hbase bolt. ## 1.0.1 + * STORM-1696: status not sync if zk fails in backpressure * STORM-1693: Move stats cleanup to executor shutdown * STORM-1670: LocalState#get(String) can throw FileNotFoundException which results in not removing worker heartbeats and supervisor is kind of stuck and goes down after some time. * STORM-1677: Test resource files are excluded from source distribution, which makes logviewer-test failing
[3/4] storm git commit: Merge branch '1696b' of https://github.com/zhuoliu/storm into STORM-1696
Merge branch '1696b' of https://github.com/zhuoliu/storm into STORM-1696 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/34d95156 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/34d95156 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/34d95156 Branch: refs/heads/master Commit: 34d95156dbec702d1ea5d20b2c9247e88fbb717e Parents: 6cf8a9c 63518be Author: Jungtaek Lim Authored: Fri Apr 15 11:43:23 2016 +0900 Committer: Jungtaek Lim Committed: Fri Apr 15 11:43:23 2016 +0900 -- .../src/clj/org/apache/storm/daemon/worker.clj | 19 +++ .../org/apache/storm/utils/DisruptorQueue.java | 8 ++-- 2 files changed, 17 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/34d95156/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java --
[1/4] storm git commit: [STORM-1696] status not sync if zk fails in backpressure
Repository: storm Updated Branches: refs/heads/master 6cf8a9c99 -> 97e131d5a [STORM-1696] status not sync if zk fails in backpressure Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a70195d7 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a70195d7 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a70195d7 Branch: refs/heads/master Commit: a70195d717e1ed179425a0305b2e4279afeb6118 Parents: 6415863 Author: zhuoliu Authored: Wed Apr 13 13:04:57 2016 -0500 Committer: zhuoliu Committed: Wed Apr 13 13:04:57 2016 -0500 -- .../src/clj/org/apache/storm/daemon/worker.clj | 19 +++ .../org/apache/storm/utils/DisruptorQueue.java | 4 2 files changed, 15 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/a70195d7/storm-core/src/clj/org/apache/storm/daemon/worker.clj -- diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj index 8d30948..883630b 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj @@ -145,16 +145,19 @@ assignment-id (:assignment-id worker) port (:port worker) storm-cluster-state (:storm-cluster-state worker) -prev-backpressure-flag @(:backpressure worker)] -(when executors - (reset! (:backpressure worker) - (or @(:transfer-backpressure worker) - (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors) +prev-backpressure-flag @(:backpressure worker) +;; the backpressure flag is true if at least one of the disruptor queues has throttle-on +curr-backpressure-flag (if executors + (or (.getThrottleOn (:transfer-queue worker)) + (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors))) + prev-backpressure-flag)] ;; update the worker's backpressure flag to zookeeper only when it has changed -(log-debug "BP " @(:backpressure worker) " WAS " prev-backpressure-flag) -(when (not= prev-backpressure-flag @(:backpressure worker)) +(when (not= prev-backpressure-flag curr-backpressure-flag) (try -(.workerBackpressure storm-cluster-state storm-id assignment-id (long port) @(:backpressure worker)) +(log-debug "worker backpressure flag changing from " prev-backpressure-flag " to " curr-backpressure-flag) +(.workerBackpressure storm-cluster-state storm-id assignment-id (long port) curr-backpressure-flag) +;; doing the local reset after the zk update succeeds is very important to avoid a bad state upon zk exception +(reset! (:backpressure worker) curr-backpressure-flag) (catch Exception exc (log-error exc "workerBackpressure update failed when connecting to ZK ... will retry" http://git-wip-us.apache.org/repos/asf/storm/blob/a70195d7/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java -- diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java index 4482297..d310337 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java +++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java @@ -544,4 +544,8 @@ public class DisruptorQueue implements IStatefulObject { public QueueMetrics getMetrics() { return _metrics; } + + public boolean getThrottleOn() { + return _throttleOn; + } }
[2/4] storm git commit: minor
minor Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/63518be1 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/63518be1 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/63518be1 Branch: refs/heads/master Commit: 63518be11275104314dcebbf83bb1f8a513891ee Parents: a70195d Author: zhuoliu Authored: Wed Apr 13 13:08:19 2016 -0500 Committer: zhuoliu Committed: Wed Apr 13 13:08:19 2016 -0500 -- storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/63518be1/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java -- diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java index d310337..d09b831 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java +++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java @@ -145,8 +145,8 @@ public class DisruptorQueue implements IStatefulObject { if (_enableBackpressure && _cb != null && (_metrics.population() + _overflowCount.get()) >= _highWaterMark) { try { if (!_throttleOn) { -_cb.highWaterMark(); _throttleOn = true; +_cb.highWaterMark(); } } catch (Exception e) { throw new RuntimeException("Exception during calling highWaterMark callback!", e); @@ -199,8 +199,8 @@ public class DisruptorQueue implements IStatefulObject { if (_enableBackpressure && _cb != null && (_metrics.population() + _overflowCount.get()) >= _highWaterMark) { try { if (!_throttleOn) { -_cb.highWaterMark(); _throttleOn = true; +_cb.highWaterMark(); } } catch (Exception e) { throw new RuntimeException("Exception during calling highWaterMark callback!", e);
[2/3] storm git commit: Merge branch 'STORM-1696-1.x-branch' of https://github.com/zhuoliu/storm into STORM-1696-1.x
Merge branch 'STORM-1696-1.x-branch' of https://github.com/zhuoliu/storm into STORM-1696-1.x Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bd553198 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bd553198 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bd553198 Branch: refs/heads/1.x-branch Commit: bd553198f96528dd7fb637601ad8925d5339552b Parents: b511a8b 9271056 Author: Jungtaek Lim Authored: Fri Apr 15 11:53:04 2016 +0900 Committer: Jungtaek Lim Committed: Fri Apr 15 11:53:04 2016 +0900 -- .../src/clj/org/apache/storm/daemon/worker.clj | 19 +++ .../org/apache/storm/utils/DisruptorQueue.java | 8 ++-- 2 files changed, 17 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/bd553198/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java --
[1/3] storm git commit: [STORM-1696]-1.x-branch status not sync if zk fails in backpressure
Repository: storm Updated Branches: refs/heads/1.x-branch b511a8b4f -> 3a4825ed9 [STORM-1696]-1.x-branch status not sync if zk fails in backpressure Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9271056b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9271056b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9271056b Branch: refs/heads/1.x-branch Commit: 9271056b22ab5c734157a9ca1f3f4ab9a28d4b4b Parents: 6af0d10 Author: zhuol Authored: Thu Apr 7 18:12:33 2016 -0500 Committer: zhuol Committed: Thu Apr 7 18:12:33 2016 -0500 -- .../src/clj/org/apache/storm/daemon/worker.clj | 19 +++ .../org/apache/storm/utils/DisruptorQueue.java | 8 ++-- 2 files changed, 17 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/9271056b/storm-core/src/clj/org/apache/storm/daemon/worker.clj -- diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj index 778e83d..b8bc423 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj @@ -138,16 +138,19 @@ assignment-id (:assignment-id worker) port (:port worker) storm-cluster-state (:storm-cluster-state worker) -prev-backpressure-flag @(:backpressure worker)] -(when executors - (reset! (:backpressure worker) - (or @(:transfer-backpressure worker) - (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors) +prev-backpressure-flag @(:backpressure worker) +;; the backpressure flag is true if at least one of the disruptor queues has throttle-on +curr-backpressure-flag (if executors + (or (.getThrottleOn (:transfer-queue worker)) + (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors))) + prev-backpressure-flag)] ;; update the worker's backpressure flag to zookeeper only when it has changed -(log-debug "BP " @(:backpressure worker) " WAS " prev-backpressure-flag) -(when (not= prev-backpressure-flag @(:backpressure worker)) +(when (not= prev-backpressure-flag curr-backpressure-flag) (try -(.worker-backpressure! storm-cluster-state storm-id assignment-id port @(:backpressure worker)) +(log-debug "worker backpressure flag changing from " prev-backpressure-flag " to " curr-backpressure-flag) +(.worker-backpressure! storm-cluster-state storm-id assignment-id port curr-backpressure-flag) +;; doing the local reset after the zk update succeeds is very important to avoid a bad state upon zk exception +(reset! (:backpressure worker) curr-backpressure-flag) (catch Exception exc (log-error exc "workerBackpressure update failed when connecting to ZK ... will retry" http://git-wip-us.apache.org/repos/asf/storm/blob/9271056b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java -- diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java index 19aba06..9f39d06 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java +++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java @@ -146,8 +146,8 @@ public class DisruptorQueue implements IStatefulObject { if (_enableBackpressure && _cb != null && (_metrics.population() + _overflowCount.get()) >= _highWaterMark) { try { if (!_throttleOn) { -_cb.highWaterMark(); _throttleOn = true; +_cb.highWaterMark(); } } catch (Exception e) { throw new RuntimeException("Exception during calling highWaterMark callback!", e); @@ -200,8 +200,8 @@ public class DisruptorQueue implements IStatefulObject { if (_enableBackpressure && _cb != null && (_metrics.population() + _overflowCount.get()) >= _highWaterMark) { try { if (!_throttleOn) { -_cb.highWaterMark(); _throttleOn = true; +_cb.highWaterMark(); } } catch (Exception e) { throw new RuntimeException("Exception during calling highWaterMark callback!", e); @@ -537,6 +5
[3/3] storm git commit: add STORM-1696 to CHANGELOG.md
add STORM-1696 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3a4825ed Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3a4825ed Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3a4825ed Branch: refs/heads/1.x-branch Commit: 3a4825ed91bed93c976999b8f018556e8063f561 Parents: bd55319 Author: Jungtaek Lim Authored: Fri Apr 15 11:53:50 2016 +0900 Committer: Jungtaek Lim Committed: Fri Apr 15 11:53:50 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/3a4825ed/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index 3faa2d8..32fad9e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## 1.0.1 + * STORM-1696: status not sync if zk fails in backpressure * STORM-1693: Move stats cleanup to executor shutdown * STORM-1585: Add DDL support for UDFs in storm-sql * STORM-1681: Bug in scheduling cyclic topologies when scheduling with RAS
[2/3] storm git commit: Merge branch 'STORM-1649-1.x' of https://github.com/satishd/storm into STORM-1649-1.x
Merge branch 'STORM-1649-1.x' of https://github.com/satishd/storm into STORM-1649-1.x Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/eb0ea70b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/eb0ea70b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/eb0ea70b Branch: refs/heads/1.x-branch Commit: eb0ea70bbb54bd4a5239d77bfc20ba86875e6e13 Parents: 3a4825e 72d5225 Author: Jungtaek Lim Authored: Wed Apr 20 16:11:43 2016 +0900 Committer: Jungtaek Lim Committed: Wed Apr 20 16:11:43 2016 +0900 -- .../trident/windowing/HBaseWindowsStore.java| 42 +- .../windowing/HBaseWindowsStoreFactory.java | 4 +- .../windowing/InMemoryWindowsStoreFactory.java | 6 +- .../trident/windowing/WindowKryoSerializer.java | 87 .../windowing/WindowTridentProcessor.java | 8 +- .../trident/windowing/WindowsStateUpdater.java | 2 +- .../storm/trident/windowing/WindowsStore.java | 1 + .../trident/windowing/WindowsStoreFactory.java | 9 +- 8 files changed, 122 insertions(+), 37 deletions(-) --
[1/3] storm git commit: STORM-1649 kryo serialization in windowing
Repository: storm Updated Branches: refs/heads/1.x-branch 3a4825ed9 -> ac7b828ad STORM-1649 kryo serialization in windowing Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/72d5225f Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/72d5225f Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/72d5225f Branch: refs/heads/1.x-branch Commit: 72d5225fed3d74df28a89ee9e37a7957ec791656 Parents: 3a4825e Author: Satish Duggana Authored: Tue Mar 29 10:02:28 2016 +0530 Committer: Satish Duggana Committed: Wed Apr 20 11:33:16 2016 +0530 -- .../trident/windowing/HBaseWindowsStore.java| 42 +- .../windowing/HBaseWindowsStoreFactory.java | 4 +- .../windowing/InMemoryWindowsStoreFactory.java | 6 +- .../trident/windowing/WindowKryoSerializer.java | 87 .../windowing/WindowTridentProcessor.java | 8 +- .../trident/windowing/WindowsStateUpdater.java | 2 +- .../storm/trident/windowing/WindowsStore.java | 1 + .../trident/windowing/WindowsStoreFactory.java | 9 +- 8 files changed, 122 insertions(+), 37 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/72d5225f/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java -- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java index d25b898..e319a55 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java @@ -18,31 +18,27 @@ */ package org.apache.storm.hbase.trident.windowing; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.storm.trident.windowing.WindowKryoSerializer; import org.apache.storm.trident.windowing.WindowsStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.InterruptedIOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -55,11 +51,12 @@ public class HBaseWindowsStore implements WindowsStore { public static final String UTF_8 = "utf-8"; private final ThreadLocal threadLocalHtable; +private final ThreadLocal threadLocalWindowKryoSerializer; private final Queue htables = new ConcurrentLinkedQueue<>(); private final byte[] family; private final byte[] qualifier; -public HBaseWindowsStore(final Configuration config, final String tableName, byte[] family, byte[] qualifier) { +public HBaseWindowsStore(final Map stormConf, final Configuration config, final String tableName, byte[] family, byte[] qualifier) { this.family = family; this.qualifier = qualifier; @@ -76,12 +73,23 @@ public class HBaseWindowsStore implements WindowsStore { } }; +threadLocalWindowKryoSerializer = new ThreadLocal(){ +@Override +protected WindowKryoSerializer initialValue() { +return new WindowKryoSerializer(stormConf); +} +}; + } private HTable htable() { return threadLocalHtable.get(); } +private WindowKryoSerializer windowKryoSerializer() { +return threadLocalWindowKryoSerializer.get(); +} + private byte[] effectiveKey(String key) { try { return key.getBytes(UTF_8); @@ -107,11 +115,7 @@ public class HBaseWindowsStore implements WindowsStore { return null; } -Kryo kryo = new Kryo(); -Input input = new Input(result.getValue(family, qualifier)); -Object resultObject = kryo.readClassAndObject(input); -return resultObject; - +return windowKryoSerializer().deserialize(result.getValue(family, qualifier));
[3/3] storm git commit: add STORM-1649 to CHANGELOG.md
add STORM-1649 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ac7b828a Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ac7b828a Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ac7b828a Branch: refs/heads/1.x-branch Commit: ac7b828adf9a06f2e18c9d63bf1d4fd1797780a1 Parents: eb0ea70 Author: Jungtaek Lim Authored: Wed Apr 20 16:12:15 2016 +0900 Committer: Jungtaek Lim Committed: Wed Apr 20 16:12:15 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/ac7b828a/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index 32fad9e..bb2f795 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## 1.0.1 + * STORM-1649: Optimize Kryo instaces creation in HBaseWindowsStore * STORM-1696: status not sync if zk fails in backpressure * STORM-1693: Move stats cleanup to executor shutdown * STORM-1585: Add DDL support for UDFs in storm-sql
[2/3] storm git commit: Merge branch 'STORM-1649-1' of https://github.com/satishd/storm into STORM-1649
Merge branch 'STORM-1649-1' of https://github.com/satishd/storm into STORM-1649 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3cee864b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3cee864b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3cee864b Branch: refs/heads/master Commit: 3cee864ba23f350cffa53f17542e27b16419b1f0 Parents: e9785d8 b850847 Author: Jungtaek Lim Authored: Wed Apr 20 16:09:59 2016 +0900 Committer: Jungtaek Lim Committed: Wed Apr 20 16:09:59 2016 +0900 -- .../trident/windowing/HBaseWindowsStore.java| 40 + .../windowing/HBaseWindowsStoreFactory.java | 4 +- .../windowing/InMemoryWindowsStoreFactory.java | 6 +- .../trident/windowing/WindowKryoSerializer.java | 87 .../windowing/WindowTridentProcessor.java | 8 +- .../trident/windowing/WindowsStateUpdater.java | 2 +- .../storm/trident/windowing/WindowsStore.java | 1 + .../trident/windowing/WindowsStoreFactory.java | 9 +- 8 files changed, 122 insertions(+), 35 deletions(-) --
[3/3] storm git commit: add STORM-1649 to CHANGELOG.md
add STORM-1649 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d593918a Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d593918a Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d593918a Branch: refs/heads/master Commit: d593918a50034a140c42fd20d824bebea8fd6b34 Parents: 3cee864 Author: Jungtaek Lim Authored: Wed Apr 20 16:10:42 2016 +0900 Committer: Jungtaek Lim Committed: Wed Apr 20 16:10:42 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/d593918a/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index 77d35e0..0fb0134 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -78,6 +78,7 @@ * STORM-1521: When using Kerberos login from keytab with multiple bolts/executors ticket is not renewed in hbase bolt. ## 1.0.1 + * STORM-1649: Optimize Kryo instaces creation in trident windowing * STORM-1696: status not sync if zk fails in backpressure * STORM-1693: Move stats cleanup to executor shutdown * STORM-1670: LocalState#get(String) can throw FileNotFoundException which results in not removing worker heartbeats and supervisor is kind of stuck and goes down after some time.
[1/3] storm git commit: STORM-1649 kryo serialization in windowing
Repository: storm Updated Branches: refs/heads/master e9785d8f1 -> d593918a5 STORM-1649 kryo serialization in windowing Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b850847d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b850847d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b850847d Branch: refs/heads/master Commit: b850847d946ba5ad9809308065352a94bd6287b3 Parents: 33e4994 Author: Satish Duggana Authored: Tue Mar 29 10:02:28 2016 +0530 Committer: Satish Duggana Committed: Fri Apr 1 09:28:22 2016 +0530 -- .../trident/windowing/HBaseWindowsStore.java| 40 + .../windowing/HBaseWindowsStoreFactory.java | 4 +- .../windowing/InMemoryWindowsStoreFactory.java | 6 +- .../trident/windowing/WindowKryoSerializer.java | 87 .../windowing/WindowTridentProcessor.java | 8 +- .../trident/windowing/WindowsStateUpdater.java | 2 +- .../storm/trident/windowing/WindowsStore.java | 1 + .../trident/windowing/WindowsStoreFactory.java | 9 +- 8 files changed, 122 insertions(+), 35 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/b850847d/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java -- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java index ccce03a..e319a55 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java @@ -18,9 +18,6 @@ */ package org.apache.storm.hbase.trident.windowing; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -29,11 +26,11 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.storm.trident.windowing.WindowKryoSerializer; import org.apache.storm.trident.windowing.WindowsStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; @@ -41,6 +38,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -53,11 +51,12 @@ public class HBaseWindowsStore implements WindowsStore { public static final String UTF_8 = "utf-8"; private final ThreadLocal threadLocalHtable; +private final ThreadLocal threadLocalWindowKryoSerializer; private final Queue htables = new ConcurrentLinkedQueue<>(); private final byte[] family; private final byte[] qualifier; -public HBaseWindowsStore(final Configuration config, final String tableName, byte[] family, byte[] qualifier) { +public HBaseWindowsStore(final Map stormConf, final Configuration config, final String tableName, byte[] family, byte[] qualifier) { this.family = family; this.qualifier = qualifier; @@ -74,12 +73,23 @@ public class HBaseWindowsStore implements WindowsStore { } }; +threadLocalWindowKryoSerializer = new ThreadLocal(){ +@Override +protected WindowKryoSerializer initialValue() { +return new WindowKryoSerializer(stormConf); +} +}; + } private HTable htable() { return threadLocalHtable.get(); } +private WindowKryoSerializer windowKryoSerializer() { +return threadLocalWindowKryoSerializer.get(); +} + private byte[] effectiveKey(String key) { try { return key.getBytes(UTF_8); @@ -105,11 +115,7 @@ public class HBaseWindowsStore implements WindowsStore { return null; } -Kryo kryo = new Kryo(); -Input input = new Input(result.getValue(family, qualifier)); -Object resultObject = kryo.readClassAndObject(input); -return resultObject; - +return windowKryoSerializer().deserialize(result.getValue(family, qualifier)); } @Override @@ -129,7 +135,6 @@ public class HBaseWindowsStore implements WindowsStore { throw new RuntimeException
[3/3] storm git commit: add STORM-1680 to CHANGELOG.md
add STORM-1680 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ce36b6db Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ce36b6db Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ce36b6db Branch: refs/heads/master Commit: ce36b6dbf0372c704964e5e854f7863960447a49 Parents: c845781 Author: Jungtaek Lim Authored: Wed Apr 20 16:31:42 2016 +0900 Committer: Jungtaek Lim Committed: Wed Apr 20 16:31:42 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/ce36b6db/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index 0fb0134..b02c866 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -78,6 +78,7 @@ * STORM-1521: When using Kerberos login from keytab with multiple bolts/executors ticket is not renewed in hbase bolt. ## 1.0.1 + * STORM-1680: Provide configuration to set min fetch size in KafkaSpout * STORM-1649: Optimize Kryo instaces creation in trident windowing * STORM-1696: status not sync if zk fails in backpressure * STORM-1693: Move stats cleanup to executor shutdown
[1/3] storm git commit: STORM-1680, Added Kafka Spout Config FetchByte to Storm-Kafka
Repository: storm Updated Branches: refs/heads/master d593918a5 -> ce36b6dbf STORM-1680, Added Kafka Spout Config FetchByte to Storm-Kafka Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8b0af500 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8b0af500 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8b0af500 Branch: refs/heads/master Commit: 8b0af5004d8472117d709caa8af997c6842d982a Parents: d593918 Author: narendra_bidari Authored: Mon Apr 4 12:28:09 2016 -0700 Committer: Jungtaek Lim Committed: Wed Apr 20 16:31:02 2016 +0900 -- .../storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java | 5 - .../storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/8b0af500/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java -- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java index e1e1d24..c845531 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java @@ -17,10 +17,12 @@ */ package org.apache.storm.kafka; +import java.io.Serializable; + import org.apache.storm.spout.MultiScheme; import org.apache.storm.spout.RawMultiScheme; -import java.io.Serializable; +import kafka.api.FetchRequest; public class KafkaConfig implements Serializable { private static final long serialVersionUID = 5276718734571623855L; @@ -39,6 +41,7 @@ public class KafkaConfig implements Serializable { public long maxOffsetBehind = Long.MAX_VALUE; public boolean useStartOffsetTimeIfOffsetOutOfRange = true; public int metricsTimeBucketSizeInSecs = 60; +public int minFetchByte = FetchRequest.DefaultMinBytes(); public KafkaConfig(BrokerHosts hosts, String topic) { this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId()); http://git-wip-us.apache.org/repos/asf/storm/blob/8b0af500/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java -- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java index 7fa4340..090b6d1 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java @@ -189,7 +189,7 @@ public class KafkaUtils { int partitionId = partition.partition; FetchRequestBuilder builder = new FetchRequestBuilder(); FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes). -clientId(config.clientId).maxWait(config.fetchMaxWait).build(); + clientId(config.clientId).maxWait(config.fetchMaxWait).minBytes(config.minFetchByte).build(); FetchResponse fetchResponse; try { fetchResponse = consumer.fetch(fetchRequest);
[3/3] storm git commit: add STORM-1680 to CHANGELOG.md
add STORM-1680 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/39e8f0e9 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/39e8f0e9 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/39e8f0e9 Branch: refs/heads/1.x-branch Commit: 39e8f0e9c2cdea8fb73c37a71a4b8d6f6029052b Parents: e603cfe Author: Jungtaek Lim Authored: Wed Apr 20 16:30:32 2016 +0900 Committer: Jungtaek Lim Committed: Wed Apr 20 16:30:32 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/39e8f0e9/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index bb2f795..401c03f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## 1.0.1 + * STORM-1680: Provide configuration to set min fetch size in KafkaSpout * STORM-1649: Optimize Kryo instaces creation in HBaseWindowsStore * STORM-1696: status not sync if zk fails in backpressure * STORM-1693: Move stats cleanup to executor shutdown
[2/3] storm git commit: Merge branch '1.x-branch' of https://github.com/Symantec/storm into STORM-1680-1.x
Merge branch '1.x-branch' of https://github.com/Symantec/storm into STORM-1680-1.x Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e603cfef Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e603cfef Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e603cfef Branch: refs/heads/1.x-branch Commit: e603cfefbbe84d8b65debd7a98985d9481e5aead Parents: ac7b828 89f4d44 Author: Jungtaek Lim Authored: Wed Apr 20 16:29:17 2016 +0900 Committer: Jungtaek Lim Committed: Wed Apr 20 16:29:17 2016 +0900 -- .../storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java | 5 - .../storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) --
[2/3] storm git commit: Merge branch 'STORM-1680'
Merge branch 'STORM-1680' Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c8457812 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c8457812 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c8457812 Branch: refs/heads/master Commit: c8457812805b658d895445ce58d2583d1cdfaa59 Parents: d593918 8b0af50 Author: Jungtaek Lim Authored: Wed Apr 20 16:31:10 2016 +0900 Committer: Jungtaek Lim Committed: Wed Apr 20 16:31:10 2016 +0900 -- .../storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java | 5 - .../storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) --
[1/3] storm git commit: STORM-1680, Added Kafka Spout Config FetchByte to Storm-Kafka
Repository: storm Updated Branches: refs/heads/1.x-branch ac7b828ad -> 39e8f0e9c STORM-1680, Added Kafka Spout Config FetchByte to Storm-Kafka Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/89f4d44d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/89f4d44d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/89f4d44d Branch: refs/heads/1.x-branch Commit: 89f4d44dee45567d6d855842856d5439f9501987 Parents: a240df5 Author: narendra_bidari Authored: Mon Apr 4 12:28:09 2016 -0700 Committer: narendra_bidari Committed: Mon Apr 4 12:28:09 2016 -0700 -- .../storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java | 5 - .../storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/89f4d44d/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java -- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java index e1e1d24..c845531 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java @@ -17,10 +17,12 @@ */ package org.apache.storm.kafka; +import java.io.Serializable; + import org.apache.storm.spout.MultiScheme; import org.apache.storm.spout.RawMultiScheme; -import java.io.Serializable; +import kafka.api.FetchRequest; public class KafkaConfig implements Serializable { private static final long serialVersionUID = 5276718734571623855L; @@ -39,6 +41,7 @@ public class KafkaConfig implements Serializable { public long maxOffsetBehind = Long.MAX_VALUE; public boolean useStartOffsetTimeIfOffsetOutOfRange = true; public int metricsTimeBucketSizeInSecs = 60; +public int minFetchByte = FetchRequest.DefaultMinBytes(); public KafkaConfig(BrokerHosts hosts, String topic) { this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId()); http://git-wip-us.apache.org/repos/asf/storm/blob/89f4d44d/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java -- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java index a2be825..9eb8268 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java @@ -185,7 +185,7 @@ public class KafkaUtils { int partitionId = partition.partition; FetchRequestBuilder builder = new FetchRequestBuilder(); FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes). -clientId(config.clientId).maxWait(config.fetchMaxWait).build(); + clientId(config.clientId).maxWait(config.fetchMaxWait).minBytes(config.minFetchByte).build(); FetchResponse fetchResponse; try { fetchResponse = consumer.fetch(fetchRequest);
[1/3] storm git commit: only check non-system streams by default
Repository: storm Updated Branches: refs/heads/1.x-branch 39e8f0e9c -> a7370a62f only check non-system streams by default Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6057fbf6 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6057fbf6 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6057fbf6 Branch: refs/heads/1.x-branch Commit: 6057fbf6f98f6916917716f57fbe3f473f655b6e Parents: a240df5 Author: Derek Dagit Authored: Mon Apr 4 13:24:21 2016 -0500 Committer: Derek Dagit Committed: Mon Apr 4 13:24:21 2016 -0500 -- storm-core/src/clj/org/apache/storm/ui/core.clj | 12 +++- 1 file changed, 3 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/6057fbf6/storm-core/src/clj/org/apache/storm/ui/core.clj -- diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj index c35e051..1ab2d33 100644 --- a/storm-core/src/clj/org/apache/storm/ui/core.clj +++ b/storm-core/src/clj/org/apache/storm/ui/core.clj @@ -109,14 +109,6 @@ [topology ^ExecutorSummary s] (component-type topology (.get_component_id s))) -(defn is-ack-stream - [stream] - (let [acker-streams -[ACKER-INIT-STREAM-ID - ACKER-ACK-STREAM-ID - ACKER-FAIL-STREAM-ID]] -(every? #(not= %1 stream) acker-streams))) - (defn spout-summary? [topology s] (= :spout (executor-summary-type topology s))) @@ -272,7 +264,9 @@ (for [m (get v :inputs)] {:stream (get m :stream) :sani-stream (get m :sani-stream) - :checked (is-ack-stream (get m :stream))}))] + :checked (not +(Utils/isSystemId + (get m :stream)))}))] (map (fn [row] {:row row}) (partition 4 4 nil streams
[2/3] storm git commit: Merge branch 'storm-1683-ui-viz-uncheck-sys-streams' of https://github.com/d2r/storm into STORM-1683-1.x
Merge branch 'storm-1683-ui-viz-uncheck-sys-streams' of https://github.com/d2r/storm into STORM-1683-1.x Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b191c623 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b191c623 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b191c623 Branch: refs/heads/1.x-branch Commit: b191c623c7fef5fe5c329f0f179130b953da25c7 Parents: 39e8f0e 6057fbf Author: Jungtaek Lim Authored: Wed Apr 20 16:45:53 2016 +0900 Committer: Jungtaek Lim Committed: Wed Apr 20 16:45:53 2016 +0900 -- storm-core/src/clj/org/apache/storm/ui/core.clj | 12 +++- 1 file changed, 3 insertions(+), 9 deletions(-) --
[1/3] storm git commit: only check non-system streams by default
Repository: storm Updated Branches: refs/heads/master ce36b6dbf -> e6b57ce4b only check non-system streams by default Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e5ed7ffe Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e5ed7ffe Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e5ed7ffe Branch: refs/heads/master Commit: e5ed7ffe5a81e3c6ecf106f21cc84dd2bbe0a927 Parents: f631516 Author: Derek Dagit Authored: Mon Apr 4 13:24:21 2016 -0500 Committer: Derek Dagit Committed: Wed Apr 6 11:20:24 2016 -0700 -- storm-core/src/clj/org/apache/storm/ui/core.clj | 12 +++- 1 file changed, 3 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/e5ed7ffe/storm-core/src/clj/org/apache/storm/ui/core.clj -- diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj index 885d754..17d9205 100644 --- a/storm-core/src/clj/org/apache/storm/ui/core.clj +++ b/storm-core/src/clj/org/apache/storm/ui/core.clj @@ -112,14 +112,6 @@ [topology ^ExecutorSummary s] (StatsUtil/componentType topology (.get_component_id s))) -(defn is-ack-stream - [stream] - (let [acker-streams -[Acker/ACKER_INIT_STREAM_ID - Acker/ACKER_ACK_STREAM_ID - Acker/ACKER_FAIL_STREAM_ID]] -(every? #(not= %1 stream) acker-streams))) - (defn spout-summary? [topology s] (= "spout" (executor-summary-type topology s))) @@ -283,7 +275,9 @@ (for [m (get v :inputs)] {:stream (get m :stream) :sani-stream (get m :sani-stream) - :checked (is-ack-stream (get m :stream))}))] + :checked (not +(Utils/isSystemId + (get m :stream)))}))] (map (fn [row] {:row row}) (partition 4 4 nil streams
[2/3] storm git commit: Merge branch 'storm-1683-ui-viz-uncheck-sys-streams-master' of https://github.com/d2r/storm into STORM-1683
Merge branch 'storm-1683-ui-viz-uncheck-sys-streams-master' of https://github.com/d2r/storm into STORM-1683 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fbf1e84a Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fbf1e84a Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fbf1e84a Branch: refs/heads/master Commit: fbf1e84a2fbcaa45588af51ddcb233e7600d81b9 Parents: ce36b6d e5ed7ff Author: Jungtaek Lim Authored: Wed Apr 20 16:44:26 2016 +0900 Committer: Jungtaek Lim Committed: Wed Apr 20 16:44:26 2016 +0900 -- storm-core/src/clj/org/apache/storm/ui/core.clj | 12 +++- 1 file changed, 3 insertions(+), 9 deletions(-) --
[3/3] storm git commit: add STORM-1683 to CHANGELOG.md
add STORM-1683 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a7370a62 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a7370a62 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a7370a62 Branch: refs/heads/1.x-branch Commit: a7370a62fb2fb55fc182588658dd644b893c0f3e Parents: b191c62 Author: Jungtaek Lim Authored: Wed Apr 20 16:46:23 2016 +0900 Committer: Jungtaek Lim Committed: Wed Apr 20 16:46:23 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/a7370a62/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index 401c03f..dbf2345 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## 1.0.1 + * STORM-1683: only check non-system streams by default * STORM-1680: Provide configuration to set min fetch size in KafkaSpout * STORM-1649: Optimize Kryo instaces creation in HBaseWindowsStore * STORM-1696: status not sync if zk fails in backpressure
[3/3] storm git commit: add STORM-1683 to CHANGELOG.md
add STORM-1683 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e6b57ce4 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e6b57ce4 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e6b57ce4 Branch: refs/heads/master Commit: e6b57ce4bb0acd11631350501461c06a608e4ed9 Parents: fbf1e84 Author: Jungtaek Lim Authored: Wed Apr 20 16:44:58 2016 +0900 Committer: Jungtaek Lim Committed: Wed Apr 20 16:44:58 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/e6b57ce4/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index b02c866..04e9c34 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -78,6 +78,7 @@ * STORM-1521: When using Kerberos login from keytab with multiple bolts/executors ticket is not renewed in hbase bolt. ## 1.0.1 + * STORM-1683: only check non-system streams by default * STORM-1680: Provide configuration to set min fetch size in KafkaSpout * STORM-1649: Optimize Kryo instaces creation in trident windowing * STORM-1696: status not sync if zk fails in backpressure
[3/4] storm git commit: Merge branch 'STORM-1714-1.x' into 1.x-branch
Merge branch 'STORM-1714-1.x' into 1.x-branch Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0ba4bb51 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0ba4bb51 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0ba4bb51 Branch: refs/heads/1.x-branch Commit: 0ba4bb5100efff705f3b62b78ad476c2591d7eb2 Parents: a7370a6 95e7afc Author: Jungtaek Lim Authored: Wed Apr 20 17:23:12 2016 +0900 Committer: Jungtaek Lim Committed: Wed Apr 20 17:23:12 2016 +0900 -- .../topology/BaseStatefulBoltExecutor.java | 209 +++ .../topology/CheckpointTupleForwarder.java | 159 +- .../apache/storm/topology/IStatefulBolt.java| 19 +- .../storm/topology/StatefulBoltExecutor.java| 22 +- .../apache/storm/topology/TopologyBuilder.java | 48 + 5 files changed, 301 insertions(+), 156 deletions(-) --
[2/4] storm git commit: [STORM-1714] refactored common logic into BaseStatefulBoltExecutor
[STORM-1714] refactored common logic into BaseStatefulBoltExecutor Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/95e7afc2 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/95e7afc2 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/95e7afc2 Branch: refs/heads/1.x-branch Commit: 95e7afc29cde1b7f16c1b0ab8ad65bbd9ed9a2af Parents: 3b879d6 Author: Arun Mahadevan Authored: Mon Apr 18 23:17:10 2016 +0530 Committer: Jungtaek Lim Committed: Wed Apr 20 17:18:17 2016 +0900 -- .../topology/BaseStatefulBoltExecutor.java | 209 +++ .../topology/CheckpointTupleForwarder.java | 165 +-- .../storm/topology/StatefulBoltExecutor.java| 4 +- 3 files changed, 218 insertions(+), 160 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/95e7afc2/storm-core/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java -- diff --git a/storm-core/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java new file mode 100644 index 000..b93a061 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.topology; + +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.spout.CheckPointState; +import org.apache.storm.spout.CheckpointSpout; +import org.apache.storm.task.IOutputCollector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.storm.spout.CheckPointState.Action.ROLLBACK; +import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_FIELD_ACTION; +import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_FIELD_TXID; +import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_STREAM_ID; + +/** + * Base class that abstracts the common logic for executing bolts in a stateful topology. + */ +public abstract class BaseStatefulBoltExecutor implements IRichBolt { +private static final Logger LOG = LoggerFactory.getLogger(BaseStatefulBoltExecutor.class); +private final Map transactionRequestCount; +private int checkPointInputTaskCount; +private long lastTxid = Long.MIN_VALUE; +protected OutputCollector collector; + +public BaseStatefulBoltExecutor() { +transactionRequestCount = new HashMap<>(); +} + +protected void init(TopologyContext context, OutputCollector collector) { +this.collector = collector; +this.checkPointInputTaskCount = getCheckpointInputTaskCount(context); +} + +/** + * returns the total number of input checkpoint streams across + * all input tasks to this component. + */ +private int getCheckpointInputTaskCount(TopologyContext context) { +int count = 0; +for (GlobalStreamId inputStream : context.getThisSources().keySet()) { +if (CHECKPOINT_STREAM_ID.equals(inputStream.get_streamId())) { +count += context.getComponentTasks(inputStream.get_componentId()).size(); +} +} +return count; +} + +@Override +public void execute(Tuple input) { +if (CheckpointSpout.isCheckpoint(input)) { +processCheckpoint(input); +} else { +handleTuple(input); +} +} + +/** + * Invokes handleCheckpoint once checkpoint tuple is received on + * all input checkpoint streams to this component. + */ +private void processCheckpoint(Tuple input) { +CheckPointState.Action action = (CheckPointState.Action) input.getValueByField(CHECKPOINT_FIELD_ACTION); +long txid = input.getL
[1/4] storm git commit: [STORM-1714] StatefulBolts ends up as normal bolts while using TopologyBuilder.setBolt without parallelism
Repository: storm Updated Branches: refs/heads/master e6b57ce4b -> 1677de1a1 [STORM-1714] StatefulBolts ends up as normal bolts while using TopologyBuilder.setBolt without parallelism Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8868a58c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8868a58c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8868a58c Branch: refs/heads/master Commit: 8868a58c27b6baee3d8fa1487434fc58509e514d Parents: 97e131d Author: Arun Mahadevan Authored: Fri Apr 15 17:36:07 2016 +0530 Committer: Arun Mahadevan Committed: Fri Apr 15 20:53:36 2016 +0530 -- .../topology/CheckpointTupleForwarder.java | 8 .../apache/storm/topology/IStatefulBolt.java| 19 +++- .../storm/topology/StatefulBoltExecutor.java| 20 ++-- .../apache/storm/topology/TopologyBuilder.java | 48 4 files changed, 91 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/8868a58c/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java -- diff --git a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java index 11d0384..9d21c33 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java +++ b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java @@ -53,6 +53,10 @@ public class CheckpointTupleForwarder implements IRichBolt { private long lastTxid = Long.MIN_VALUE; private AnchoringOutputCollector collector; +public CheckpointTupleForwarder() { +this(null); +} + public CheckpointTupleForwarder(IRichBolt bolt) { this.bolt = bolt; transactionRequestCount = new HashMap<>(); @@ -86,6 +90,10 @@ public class CheckpointTupleForwarder implements IRichBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { bolt.declareOutputFields(declarer); +declareCheckpointStream(declarer); +} + +protected void declareCheckpointStream(OutputFieldsDeclarer declarer) { declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION)); } http://git-wip-us.apache.org/repos/asf/storm/blob/8868a58c/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java -- diff --git a/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java b/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java index ed55e1d..ef6c837 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java +++ b/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java @@ -18,6 +18,11 @@ package org.apache.storm.topology; import org.apache.storm.state.State; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; + +import java.util.Map; /** * A bolt abstraction for supporting stateful computation. The state of the bolt is @@ -27,5 +32,17 @@ import org.apache.storm.state.State; * state updates. The stateful bolts are expected to anchor the tuples while emitting * and ack the input tuples once its processed. */ -public interface IStatefulBolt extends IStatefulComponent, IRichBolt { +public interface IStatefulBolt extends IStatefulComponent { +/** + * @see org.apache.storm.task.IBolt#prepare(Map, TopologyContext, OutputCollector) + */ +void prepare(Map stormConf, TopologyContext context, OutputCollector collector); +/** + * @see org.apache.storm.task.IBolt#execute(Tuple) + */ +void execute(Tuple input); +/** + * @see org.apache.storm.task.IBolt#cleanup() + */ +void cleanup(); } http://git-wip-us.apache.org/repos/asf/storm/blob/8868a58c/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java -- diff --git a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java index 237305e..9873084 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java +++ b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java @@ -28,7 +28,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -40,7 +39,6 @@ import static org.apache.storm.spout.CheckPointState.Action.COMMIT; import static org.apache.storm.sp
[3/4] storm git commit: Merge branch 'STORM-1714' of https://github.com/arunmahadevan/storm into STORM-1714
Merge branch 'STORM-1714' of https://github.com/arunmahadevan/storm into STORM-1714 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3e8edcab Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3e8edcab Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3e8edcab Branch: refs/heads/master Commit: 3e8edcabf1dc7f39e5f6103a0a461d66ede6bf6c Parents: e6b57ce f2a02bf Author: Jungtaek Lim Authored: Wed Apr 20 17:07:15 2016 +0900 Committer: Jungtaek Lim Committed: Wed Apr 20 17:07:15 2016 +0900 -- .../topology/BaseStatefulBoltExecutor.java | 209 +++ .../topology/CheckpointTupleForwarder.java | 159 +- .../apache/storm/topology/IStatefulBolt.java| 19 +- .../storm/topology/StatefulBoltExecutor.java| 22 +- .../apache/storm/topology/TopologyBuilder.java | 48 + 5 files changed, 301 insertions(+), 156 deletions(-) --
[4/4] storm git commit: add STORM-1714 to CHANGELOG.md
add STORM-1714 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1677de1a Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1677de1a Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1677de1a Branch: refs/heads/master Commit: 1677de1a1a4177b0f2f3cacdf4152b7e9fa976bd Parents: 3e8edca Author: Jungtaek Lim Authored: Wed Apr 20 17:17:39 2016 +0900 Committer: Jungtaek Lim Committed: Wed Apr 20 17:17:39 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/1677de1a/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index 04e9c34..9344f3e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -78,6 +78,7 @@ * STORM-1521: When using Kerberos login from keytab with multiple bolts/executors ticket is not renewed in hbase bolt. ## 1.0.1 + * STORM-1714: StatefulBolts ends up as normal bolts while using TopologyBuilder.setBolt without parallelism * STORM-1683: only check non-system streams by default * STORM-1680: Provide configuration to set min fetch size in KafkaSpout * STORM-1649: Optimize Kryo instaces creation in trident windowing
[2/4] storm git commit: [STORM-1714] refactored common logic into BaseStatefulBoltExecutor
[STORM-1714] refactored common logic into BaseStatefulBoltExecutor Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f2a02bff Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f2a02bff Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f2a02bff Branch: refs/heads/master Commit: f2a02bff16401fd27d91c6efee951ecb528a1b7b Parents: 8868a58 Author: Arun Mahadevan Authored: Mon Apr 18 23:17:10 2016 +0530 Committer: Arun Mahadevan Committed: Tue Apr 19 23:18:56 2016 +0530 -- .../topology/BaseStatefulBoltExecutor.java | 209 +++ .../topology/CheckpointTupleForwarder.java | 165 +-- .../storm/topology/StatefulBoltExecutor.java| 4 +- 3 files changed, 218 insertions(+), 160 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/f2a02bff/storm-core/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java -- diff --git a/storm-core/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java new file mode 100644 index 000..b93a061 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.topology; + +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.spout.CheckPointState; +import org.apache.storm.spout.CheckpointSpout; +import org.apache.storm.task.IOutputCollector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.storm.spout.CheckPointState.Action.ROLLBACK; +import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_FIELD_ACTION; +import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_FIELD_TXID; +import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_STREAM_ID; + +/** + * Base class that abstracts the common logic for executing bolts in a stateful topology. + */ +public abstract class BaseStatefulBoltExecutor implements IRichBolt { +private static final Logger LOG = LoggerFactory.getLogger(BaseStatefulBoltExecutor.class); +private final Map transactionRequestCount; +private int checkPointInputTaskCount; +private long lastTxid = Long.MIN_VALUE; +protected OutputCollector collector; + +public BaseStatefulBoltExecutor() { +transactionRequestCount = new HashMap<>(); +} + +protected void init(TopologyContext context, OutputCollector collector) { +this.collector = collector; +this.checkPointInputTaskCount = getCheckpointInputTaskCount(context); +} + +/** + * returns the total number of input checkpoint streams across + * all input tasks to this component. + */ +private int getCheckpointInputTaskCount(TopologyContext context) { +int count = 0; +for (GlobalStreamId inputStream : context.getThisSources().keySet()) { +if (CHECKPOINT_STREAM_ID.equals(inputStream.get_streamId())) { +count += context.getComponentTasks(inputStream.get_componentId()).size(); +} +} +return count; +} + +@Override +public void execute(Tuple input) { +if (CheckpointSpout.isCheckpoint(input)) { +processCheckpoint(input); +} else { +handleTuple(input); +} +} + +/** + * Invokes handleCheckpoint once checkpoint tuple is received on + * all input checkpoint streams to this component. + */ +private void processCheckpoint(Tuple input) { +CheckPointState.Action action = (CheckPointState.Action) input.getValueByField(CHECKPOINT_FIELD_ACTION); +long txid = input.getLon
[1/4] storm git commit: [STORM-1714] StatefulBolts ends up as normal bolts while using TopologyBuilder.setBolt without parallelism
Repository: storm Updated Branches: refs/heads/1.x-branch a7370a62f -> 3399a4a46 [STORM-1714] StatefulBolts ends up as normal bolts while using TopologyBuilder.setBolt without parallelism Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3b879d62 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3b879d62 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3b879d62 Branch: refs/heads/1.x-branch Commit: 3b879d62d68ac75310d12784d8d1ad4213fdd38e Parents: a7370a6 Author: Arun Mahadevan Authored: Fri Apr 15 17:36:07 2016 +0530 Committer: Jungtaek Lim Committed: Wed Apr 20 17:18:12 2016 +0900 -- .../topology/CheckpointTupleForwarder.java | 8 .../apache/storm/topology/IStatefulBolt.java| 19 +++- .../storm/topology/StatefulBoltExecutor.java| 20 ++-- .../apache/storm/topology/TopologyBuilder.java | 48 4 files changed, 91 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/3b879d62/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java -- diff --git a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java index 11d0384..9d21c33 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java +++ b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java @@ -53,6 +53,10 @@ public class CheckpointTupleForwarder implements IRichBolt { private long lastTxid = Long.MIN_VALUE; private AnchoringOutputCollector collector; +public CheckpointTupleForwarder() { +this(null); +} + public CheckpointTupleForwarder(IRichBolt bolt) { this.bolt = bolt; transactionRequestCount = new HashMap<>(); @@ -86,6 +90,10 @@ public class CheckpointTupleForwarder implements IRichBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { bolt.declareOutputFields(declarer); +declareCheckpointStream(declarer); +} + +protected void declareCheckpointStream(OutputFieldsDeclarer declarer) { declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION)); } http://git-wip-us.apache.org/repos/asf/storm/blob/3b879d62/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java -- diff --git a/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java b/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java index ed55e1d..ef6c837 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java +++ b/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java @@ -18,6 +18,11 @@ package org.apache.storm.topology; import org.apache.storm.state.State; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; + +import java.util.Map; /** * A bolt abstraction for supporting stateful computation. The state of the bolt is @@ -27,5 +32,17 @@ import org.apache.storm.state.State; * state updates. The stateful bolts are expected to anchor the tuples while emitting * and ack the input tuples once its processed. */ -public interface IStatefulBolt extends IStatefulComponent, IRichBolt { +public interface IStatefulBolt extends IStatefulComponent { +/** + * @see org.apache.storm.task.IBolt#prepare(Map, TopologyContext, OutputCollector) + */ +void prepare(Map stormConf, TopologyContext context, OutputCollector collector); +/** + * @see org.apache.storm.task.IBolt#execute(Tuple) + */ +void execute(Tuple input); +/** + * @see org.apache.storm.task.IBolt#cleanup() + */ +void cleanup(); } http://git-wip-us.apache.org/repos/asf/storm/blob/3b879d62/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java -- diff --git a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java index 237305e..9873084 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java +++ b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java @@ -28,7 +28,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -40,7 +39,6 @@ import static org.apache.storm.spout.CheckPointState.Action.COMMIT; import static org.apache.st
[4/4] storm git commit: add STORM-1714 to CHANGELOG.md
add STORM-1714 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3399a4a4 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3399a4a4 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3399a4a4 Branch: refs/heads/1.x-branch Commit: 3399a4a46bffaa708c30c6f3e8b15e666c6c6151 Parents: 0ba4bb5 Author: Jungtaek Lim Authored: Wed Apr 20 17:23:28 2016 +0900 Committer: Jungtaek Lim Committed: Wed Apr 20 17:23:28 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/3399a4a4/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index dbf2345..96c3fe3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## 1.0.1 + * STORM-1714: StatefulBolts ends up as normal bolts while using TopologyBuilder.setBolt without parallelism * STORM-1683: only check non-system streams by default * STORM-1680: Provide configuration to set min fetch size in KafkaSpout * STORM-1649: Optimize Kryo instaces creation in HBaseWindowsStore
[3/3] storm git commit: add STORM-1704 to CHANGELOG.md
add STORM-1704 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9c6e6dfd Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9c6e6dfd Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9c6e6dfd Branch: refs/heads/1.x-branch Commit: 9c6e6dfd269c2f381722ab4dd154cab2b0953076 Parents: a748776 Author: Jungtaek Lim Authored: Wed Apr 20 17:26:56 2016 +0900 Committer: Jungtaek Lim Committed: Wed Apr 20 17:26:56 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/9c6e6dfd/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index 96c3fe3..7fc5bfb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## 1.0.1 + * STORM-1704: When logviewer_search.html opens daemon file, next search always show no result * STORM-1714: StatefulBolts ends up as normal bolts while using TopologyBuilder.setBolt without parallelism * STORM-1683: only check non-system streams by default * STORM-1680: Provide configuration to set min fetch size in KafkaSpout
[1/3] storm git commit: STORM-1704 When logviewer_search.html opens daemon file, next search always show no result
Repository: storm Updated Branches: refs/heads/1.x-branch 3399a4a46 -> 9c6e6dfd2 STORM-1704 When logviewer_search.html opens daemon file, next search always show no result * ensures that is-daemon parameter is passed around multiple searches * set logviewerUrl to '/daemonlog' when search is done with is-daemon=yes Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/449394a0 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/449394a0 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/449394a0 Branch: refs/heads/1.x-branch Commit: 449394a0c66f459da33253cb5ecbf22274dc0e40 Parents: 6af0d10 Author: Jungtaek Lim Authored: Mon Apr 11 16:40:58 2016 +0900 Committer: Jungtaek Lim Committed: Mon Apr 11 16:40:58 2016 +0900 -- .../clj/org/apache/storm/daemon/logviewer.clj | 77 +--- storm-core/src/ui/public/logviewer_search.html | 4 +- .../logviewer-search-page-template.html | 5 +- .../clj/org/apache/storm/logviewer_test.clj | 57 +-- 4 files changed, 106 insertions(+), 37 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/449394a0/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj -- diff --git a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj index 7baeae7..fec313b 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj @@ -526,13 +526,31 @@ (int (/ (alength needle) -2 ;; Addition :length default-bytes-per-page}))) +(defn url-to-match-centered-in-log-page-daemon-file + [needle fname offset port] + (let [host (local-hostname) +port (logviewer-port) +fname (clojure.string/join file-path-separator (take-last 1 (split fname (re-pattern file-path-separator] +(url (str "http://"; host ":" port "/daemonlog") + {:file fname + :start (max 0 +(- offset + (int (/ default-bytes-per-page 2)) + (int (/ (alength needle) -2 ;; Addition + :length default-bytes-per-page}))) + (defnk mk-match-data [^bytes needle ^ByteBuffer haystack haystack-offset file-offset fname - :before-bytes nil :after-bytes nil] - (let [url (url-to-match-centered-in-log-page needle - fname - file-offset - (*STORM-CONF* LOGVIEWER-PORT)) + :is-daemon false :before-bytes nil :after-bytes nil] + (let [url (if is-daemon + (url-to-match-centered-in-log-page-daemon-file needle + fname + file-offset + (*STORM-CONF* LOGVIEWER-PORT)) + (url-to-match-centered-in-log-page needle + fname + file-offset + (*STORM-CONF* LOGVIEWER-PORT))) haystack-bytes (.array haystack) before-string (if (>= haystack-offset grep-context-size) (String. haystack-bytes @@ -628,7 +646,7 @@ "As the file is read into a buffer, 1/2 the buffer's size at a time, we search the buffer for matches of the substring and return a list of zero or more matches." - [file file-len offset-to-buf init-buf-offset stream bytes-skipped + [is-daemon file file-len offset-to-buf init-buf-offset stream bytes-skipped bytes-read ^ByteBuffer haystack ^bytes needle initial-matches num-matches ^bytes before-bytes] (loop [buf-offset init-buf-offset @@ -653,6 +671,7 @@ offset file-offset (.getCanonicalPath file) +:is-daemon is-daemon :before-bytes before-arg :after-bytes after-arg (let [before-str-to-offset (min (.limit haystack) @@ -709,7 +728,7 @@ context lines. Other information is included to be useful for progressively searching through a file for display in a UI. The search string must grep-max-search-size bytes or fewer when decoded with UTF-8." - [file ^String search-string :num-matches 10 :start-byte-offset 0] + [file ^String search-string :is-daemon false :num-matches 10 :start-byte-offset 0] {:pre [(not (empty? search-string)) (<= (count (.getBytes search-string "UTF-8")) grep-max-search-size)]} (let [zip-file? (.endsWith (.getName file) ".gz") @@ -744,7 +763,9 @@ byte-offset start-byte-offset before-bytes nil] (let [[matches new-byte-offset new-before-bytes] -
[3/3] storm git commit: add STORM-1704 to CHANGELOG.md
add STORM-1704 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1a09da55 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1a09da55 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1a09da55 Branch: refs/heads/master Commit: 1a09da555c3aa7b0daa6d4348c6a35421a6dfeba Parents: 0ae7a48 Author: Jungtaek Lim Authored: Wed Apr 20 17:26:24 2016 +0900 Committer: Jungtaek Lim Committed: Wed Apr 20 17:26:24 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/1a09da55/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index 9344f3e..e4e6f39 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -78,6 +78,7 @@ * STORM-1521: When using Kerberos login from keytab with multiple bolts/executors ticket is not renewed in hbase bolt. ## 1.0.1 + * STORM-1704: When logviewer_search.html opens daemon file, next search always show no result * STORM-1714: StatefulBolts ends up as normal bolts while using TopologyBuilder.setBolt without parallelism * STORM-1683: only check non-system streams by default * STORM-1680: Provide configuration to set min fetch size in KafkaSpout
[1/3] storm git commit: STORM-1704 When logviewer_search.html opens daemon file, next search always show no result
Repository: storm Updated Branches: refs/heads/master 1677de1a1 -> 1a09da555 STORM-1704 When logviewer_search.html opens daemon file, next search always show no result * ensures that is-daemon parameter is passed around multiple searches * set logviewerUrl to '/daemonlog' when search is done with is-daemon=yes Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/55692bb6 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/55692bb6 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/55692bb6 Branch: refs/heads/master Commit: 55692bb6364de87b0bb24db841c40d93047f9439 Parents: f48d794 Author: Jungtaek Lim Authored: Mon Apr 11 16:40:58 2016 +0900 Committer: Jungtaek Lim Committed: Mon Apr 11 16:55:13 2016 +0900 -- .../clj/org/apache/storm/daemon/logviewer.clj | 77 +--- storm-core/src/ui/public/logviewer_search.html | 4 +- .../logviewer-search-page-template.html | 5 +- .../clj/org/apache/storm/logviewer_test.clj | 57 +-- 4 files changed, 106 insertions(+), 37 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/55692bb6/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj -- diff --git a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj index 2b80fd8..7c62b32 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj @@ -538,13 +538,31 @@ (int (/ (alength needle) -2 ;; Addition :length default-bytes-per-page}))) +(defn url-to-match-centered-in-log-page-daemon-file + [needle fname offset port] + (let [host (Utils/localHostname) +port (logviewer-port) +fname (clojure.string/join Utils/FILE_PATH_SEPARATOR (take-last 1 (split fname (re-pattern Utils/FILE_PATH_SEPARATOR] +(url (str "http://"; host ":" port "/daemonlog") + {:file fname + :start (max 0 +(- offset + (int (/ default-bytes-per-page 2)) + (int (/ (alength needle) -2 ;; Addition + :length default-bytes-per-page}))) + (defnk mk-match-data [^bytes needle ^ByteBuffer haystack haystack-offset file-offset fname - :before-bytes nil :after-bytes nil] - (let [url (url-to-match-centered-in-log-page needle - fname - file-offset - (*STORM-CONF* LOGVIEWER-PORT)) + :is-daemon false :before-bytes nil :after-bytes nil] + (let [url (if is-daemon + (url-to-match-centered-in-log-page-daemon-file needle + fname + file-offset + (*STORM-CONF* LOGVIEWER-PORT)) + (url-to-match-centered-in-log-page needle + fname + file-offset + (*STORM-CONF* LOGVIEWER-PORT))) haystack-bytes (.array haystack) before-string (if (>= haystack-offset grep-context-size) (String. haystack-bytes @@ -640,7 +658,7 @@ "As the file is read into a buffer, 1/2 the buffer's size at a time, we search the buffer for matches of the substring and return a list of zero or more matches." - [file file-len offset-to-buf init-buf-offset stream bytes-skipped + [is-daemon file file-len offset-to-buf init-buf-offset stream bytes-skipped bytes-read ^ByteBuffer haystack ^bytes needle initial-matches num-matches ^bytes before-bytes] (loop [buf-offset init-buf-offset @@ -665,6 +683,7 @@ offset file-offset (.getCanonicalPath file) +:is-daemon is-daemon :before-bytes before-arg :after-bytes after-arg (let [before-str-to-offset (min (.limit haystack) @@ -721,7 +740,7 @@ context lines. Other information is included to be useful for progressively searching through a file for display in a UI. The search string must grep-max-search-size bytes or fewer when decoded with UTF-8." - [file ^String search-string :num-matches 10 :start-byte-offset 0] + [file ^String search-string :is-daemon false :num-matches 10 :start-byte-offset 0] {:pre [(not (empty? search-string)) (<= (count (.getBytes search-string "UTF-8")) grep-max-search-size)]} (let [zip-file? (.endsWith (.getName file) ".gz") @@ -756,7 +775,9 @@ byte-offset start-byte-offset before-bytes nil] (let [[matches new-byte-offset new-before
[2/3] storm git commit: Merge branch 'STORM-1704-1.x' into 1.x-branch
Merge branch 'STORM-1704-1.x' into 1.x-branch Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a7487765 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a7487765 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a7487765 Branch: refs/heads/1.x-branch Commit: a74877650f9e6d45702c5c7f05e10e44422e4a83 Parents: 3399a4a 449394a Author: Jungtaek Lim Authored: Wed Apr 20 17:26:41 2016 +0900 Committer: Jungtaek Lim Committed: Wed Apr 20 17:26:41 2016 +0900 -- .../clj/org/apache/storm/daemon/logviewer.clj | 77 +--- storm-core/src/ui/public/logviewer_search.html | 4 +- .../logviewer-search-page-template.html | 5 +- .../clj/org/apache/storm/logviewer_test.clj | 57 +-- 4 files changed, 106 insertions(+), 37 deletions(-) --
[2/3] storm git commit: Merge branch 'STORM-1704'
Merge branch 'STORM-1704' Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0ae7a482 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0ae7a482 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0ae7a482 Branch: refs/heads/master Commit: 0ae7a482639bb66dd7020d64dc441e6ee067c726 Parents: 1677de1 55692bb Author: Jungtaek Lim Authored: Wed Apr 20 17:26:01 2016 +0900 Committer: Jungtaek Lim Committed: Wed Apr 20 17:26:01 2016 +0900 -- .../clj/org/apache/storm/daemon/logviewer.clj | 77 +--- storm-core/src/ui/public/logviewer_search.html | 4 +- .../logviewer-search-page-template.html | 5 +- .../clj/org/apache/storm/logviewer_test.clj | 57 +-- 4 files changed, 106 insertions(+), 37 deletions(-) --
storm git commit: STORM-1724 Fill up lacking contents to Metrics documentation
Repository: storm Updated Branches: refs/heads/STORM-1724-0.10.x [created] 96cb4bd5f STORM-1724 Fill up lacking contents to Metrics documentation Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/96cb4bd5 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/96cb4bd5 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/96cb4bd5 Branch: refs/heads/STORM-1724-0.10.x Commit: 96cb4bd5f037bcade002d77391a937ccb6f44ffb Parents: 22b4543 Author: Jungtaek Lim Authored: Fri Apr 22 16:39:17 2016 +0900 Committer: Jungtaek Lim Committed: Fri Apr 22 16:39:17 2016 +0900 -- docs/Metrics.md | 69 +--- 1 file changed, 66 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/96cb4bd5/docs/Metrics.md -- diff --git a/docs/Metrics.md b/docs/Metrics.md index b2521b1..af1805f 100644 --- a/docs/Metrics.md +++ b/docs/Metrics.md @@ -8,7 +8,7 @@ It's used internally to track the numbers you see in the Nimbus UI console: coun ### Metric Types -Metrics have to implement just one method, `getValueAndReset` -- do any remaining work to find the summary value, and reset back to an initial state. For example, the MeanReducer divides the running total by its running count to find the mean, then initializes both values back to zero. +Metrics have to implement [`IMetric`]({{page.git-blob-base}}/storm-core/src/jvm/backtype/storm/metric/api/IMetric.java) which contains just one method, `getValueAndReset` -- do any remaining work to find the summary value, and reset back to an initial state. For example, the MeanReducer divides the running total by its running count to find the mean, then initializes both values back to zero. Storm gives you these metric types: @@ -21,16 +21,79 @@ Storm gives you these metric types: - [MultiReducedMetric]({{page.git-blob-base}}/storm-core/src/jvm/backtype/storm/metric/api/MultiReducedMetric.java) -- a hashmap of reduced metrics. -### Metric Consumer +### Metrics Consumer + +You can listen and handle the topology metrics via registering Metrics Consumer to your topology. + +To register metrics consumer to your topology, add to your topology's configuration like: + +```java +conf.registerMetricsConsumer(org.apache.storm.metrics.LoggingMetricsConsumer.class, 1); +``` + +You can refer [Config#registerMetricsConsumer](javadocs/backtype/storm/Config.html#registerMetricsConsumer-java.lang.Class-) and overloaded methods from javadoc. + +Otherwise edit the storm.yaml config file: + +```yaml +topology.metrics.consumer.register: + - class: "backtype.storm.metric.LoggingMetricsConsumer" +parallelism.hint: 1 +``` + +Storm appends MetricsConsumerBolt to your topology per each registered metrics consumer internally, and each MetricsConsumerBolt subscribes to receive metrics from all tasks. The parallelism for that Bolt is set to `parallelism.hint` and `component id` for that Bolt is set to `__metrics_`. If you register same class name more than once, postfix `#` is appended to component id. + +Storm provides built-in metrics consumer for you to try out to see which metrics are provided in your topology. + +* [`LoggingMetricsConsumer`]({{page.git-blob-base}}/storm-core/src/jvm/backtype/storm/metric/LoggingMetricsConsumer.java) -- listens for all metrics and dumps them to log file with TSV (Tab Separated Values). + +Also, Storm exposes the interface [`IMetricsConsumer`]({{page.git-blob-base}}/storm-core/src/jvm/backtype/storm/metric/api/IMetricsConsumer.java) for implementing Metrics Consumer so you can create custom metrics consumers and attach to their topologies, or use other great implementation of Metrics Consumers provided by Storm community. Some of examples are [versign/storm-graphite](https://github.com/verisign/storm-graphite), and [storm-metrics-statsd](https://github.com/endgameinc/storm-metrics-statsd). + +When you implement your own metrics consumer, `argument` is passed to Object when [IMetricsConsumer#prepare](javadocs/backtype/storm/metric/api/IMetricsConsumer.html#prepare-java.util.Map-java.lang.Object-org.apache.storm.task.TopologyContext-org.apache.storm.task.IErrorReporter-) is called, so you need to infer the Java type of configured value on yaml, and do explicit type casting. + +Please keep in mind that MetricsConsumerBolt is just a kind of Bolt, so whole throughput of the topology will go down when registered metrics consumers cannot keep up handling incoming metrics, so you may want to take care of those Bolts like normal Bolts. One of idea to avoid this is making your implementation of Metrics Consumer as `non-blocking` fashion. ### Build your own metric +You can measure your ow
[storm] Git Push Summary
Repository: storm Updated Branches: refs/heads/STORM-1724-0.10.x [deleted] 96cb4bd5f
[2/3] storm git commit: Merge branch 'STORM-1482' of https://github.com/vesense/storm into STORM-1482
Merge branch 'STORM-1482' of https://github.com/vesense/storm into STORM-1482 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8c591727 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8c591727 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8c591727 Branch: refs/heads/master Commit: 8c591727d3c8eb4384da8c2ea7f26bc7b0c5a02d Parents: 90e4f3e 1c87f4a Author: Jungtaek Lim Authored: Mon Jan 18 09:54:05 2016 +0900 Committer: Jungtaek Lim Committed: Mon Jan 18 09:54:05 2016 +0900 -- .../src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java | 1 + 1 file changed, 1 insertion(+) --
[3/3] storm git commit: Add STORM-1482 to CHANGELOG.md
Add STORM-1482 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3a7cfc8e Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3a7cfc8e Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3a7cfc8e Branch: refs/heads/master Commit: 3a7cfc8e7374bccd98cb963b73ecc3776d1aad49 Parents: 8c59172 Author: Jungtaek Lim Authored: Mon Jan 18 09:55:46 2016 +0900 Committer: Jungtaek Lim Committed: Mon Jan 18 09:55:46 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/3a7cfc8e/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index f9e65b6..0dc2679 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## 2.0.0 ## 1.0.0 + * STORM-1482: add missing 'break' for RedisStoreBolt * STORM-1466: Move the org.apache.thrift7 namespace to something correct/sensible * STORM-1470: Applies shading to hadoop-auth, cleaner exclusions * STORM-1467: Switch apache-rat plugin off by default, but enable for Travis-CI
[1/3] storm git commit: add missing break
Repository: storm Updated Branches: refs/heads/master 90e4f3e8a -> 3a7cfc8e7 add missing break Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1c87f4aa Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1c87f4aa Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1c87f4aa Branch: refs/heads/master Commit: 1c87f4aaa5352ec4f68f7dd8dbe0d762c5b2c9d6 Parents: 90e4f3e Author: Xin Wang Authored: Sat Jan 16 21:03:47 2016 +0800 Committer: Xin Wang Committed: Sat Jan 16 21:03:47 2016 +0800 -- .../src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/1c87f4aa/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java -- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java index b74ed1c..c774dc0 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java @@ -94,6 +94,7 @@ public class RedisStoreBolt extends AbstractRedisBolt { case SORTED_SET: jedisCommand.zadd(additionalKey, Double.valueOf(value), key); +break; case HYPER_LOG_LOG: jedisCommand.pfadd(key, value);
[31/36] storm git commit: Merge branch 'STORM-1466' of github.com:knusbaum/incubator-storm
Merge branch 'STORM-1466' of github.com:knusbaum/incubator-storm Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1c45fdf4 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1c45fdf4 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1c45fdf4 Branch: refs/heads/1.x-branch Commit: 1c45fdf482d4a8eeb1e11502a7641f19f246e2e3 Parents: 396f653 ab11ce9 Author: P. Taylor Goetz Authored: Fri Jan 15 12:07:02 2016 -0500 Committer: P. Taylor Goetz Committed: Fri Jan 15 12:07:02 2016 -0500 -- storm-core/pom.xml| 2 +- .../src/main/java/org/apache/storm/hack/StormShadeRequest.java| 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/1c45fdf4/storm-core/pom.xml --
[03/36] storm git commit: STORM-1468: remove {master}/docs
http://git-wip-us.apache.org/repos/asf/storm/blob/2f5c31d2/docs/images/spout.svg -- diff --git a/docs/images/spout.svg b/docs/images/spout.svg deleted file mode 100644 index 0105957..000 --- a/docs/images/spout.svg +++ /dev/null @@ -1,833 +0,0 @@ - - - -http://www.openswatchbook.org/uri/2009/osb"; - xmlns:dc="http://purl.org/dc/elements/1.1/"; - xmlns:cc="http://creativecommons.org/ns#"; - xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"; - xmlns:svg="http://www.w3.org/2000/svg"; - xmlns="http://www.w3.org/2000/svg"; - xmlns:xlink="http://www.w3.org/1999/xlink"; - xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd"; - xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape"; - width="652.63159" - height="196.05263" - id="svg2" - version="1.1" - inkscape:version="0.48.5 r10040" - sodipodi:docname="spout.svg" - inkscape:export-filename="/Users/evans/src/storm/docs/images/spout.png" - inkscape:export-xdpi="88.139603" - inkscape:export-ydpi="88.139603"> - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -image/svg+xml -http://purl.org/dc/dcmitype/StillImage"; /> - - - - - - - - - - - - - - - - - - - - - Tuple - - - - Tuple - - - - Tuple - - - - Tuple - - - - Tuple - - - - Tuple - - - - Tuple - - - - - - - - - -Tuple - -Tuple - -Tuple - -Tuple - -Tuple - -Tuple - -Tuple - - - - -
[28/36] storm git commit: add STORM-1467 to changelog
add STORM-1467 to changelog Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/45726bbd Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/45726bbd Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/45726bbd Branch: refs/heads/1.x-branch Commit: 45726bbd18271a3a0548dfd36c89a804d350a317 Parents: b21b686 Author: P. Taylor Goetz Authored: Fri Jan 15 11:48:40 2016 -0500 Committer: P. Taylor Goetz Committed: Fri Jan 15 11:48:40 2016 -0500 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/45726bbd/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index 2898bdd..5ff26e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## 2.0.0 ## 1.0.0 + * STORM-1467: Switch apache-rat plugin off by default, but enable for Travis-CI * STORM-1468: move documentation to asf-site branch * STORM-1199: HDFS Spout Implementation. * STORM-1453: nimbus.clj/wait-for-desired-code-replication prints wrong log message
[05/36] storm git commit: STORM-1468: remove {master}/docs
http://git-wip-us.apache.org/repos/asf/storm/blob/2f5c31d2/docs/images/bolt.svg -- diff --git a/docs/images/bolt.svg b/docs/images/bolt.svg deleted file mode 100644 index 5b8adb3..000 --- a/docs/images/bolt.svg +++ /dev/null @@ -1,743 +0,0 @@ - - - -http://www.openswatchbook.org/uri/2009/osb"; - xmlns:dc="http://purl.org/dc/elements/1.1/"; - xmlns:cc="http://creativecommons.org/ns#"; - xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"; - xmlns:svg="http://www.w3.org/2000/svg"; - xmlns="http://www.w3.org/2000/svg"; - xmlns:xlink="http://www.w3.org/1999/xlink"; - xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd"; - xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape"; - width="606.33069" - height="195.39473" - id="svg2" - version="1.1" - inkscape:version="0.48.5 r10040" - sodipodi:docname="bolt.svg" - inkscape:export-filename="/Users/evans/src/storm/docs/images/bolt.png" - inkscape:export-xdpi="94.88485" - inkscape:export-ydpi="94.88485"> - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -image/svg+xml -http://purl.org/dc/dcmitype/StillImage"; /> - - - - - - - - - - - - - - - - - -Tuple - -Tuple - -Tuple - - - - - - -Tuple - -Tuple - -Tuple - - - - - - -Tuple - -Tuple - -Tuple - - - - - http://git-wip-us.apache.org/repos/asf/storm/blob/2f5c31d2/docs/images/bullet.gif -- diff --git a/docs/images/bullet.gif b/docs/images/bullet.gif deleted file mode 100644 index 45bb956..000 Binary files a/docs/images/bullet.gif and /dev/null differ http://git-wip-us.apache.org/repos/asf/storm/blob/2f5c31d2/docs/images/download.png -- diff --git a/docs/images/download.png b/docs/images/download.png deleted file mode 100644 index 5e07c78..000 Binary files a/docs/images/download.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/storm/blob/2f5c31d2/docs/images/incubator-logo.png -- diff --git a/docs/images/incubator-logo.png b/docs/images/incubator-logo.png deleted file mode 100644 index 33ca7f6..000 Binary files a/docs/images/incubator-logo.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/storm/blob/2f5c31d2/docs/images/logocontest/abartos/stationery_mockup.jpg -- diff --git a/docs/images/logocontest/abartos/stationery_mockup.jpg b/docs/images/logocontest/abartos/stationery_mockup.jpg deleted file mode 100644 index d2635d5..000 Binary files a/docs/images/logocontest/abartos/stationery_mockup.jpg and /dev/null differ http://git-wip-us.apache.org/repos/asf/storm/blob/2f5c31d2/docs/images/logocontest/abartos/storm_logo.png -- diff --git a/docs/images/logocontest/abartos/storm_logo.png b/docs/images/logocontest/abartos/storm_logo.png deleted file mode 100644 index 743b694..000 Binary files a/docs/images/logocontest/abartos/storm_logo.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/storm/blob/2f5c31d2/docs/images/logocontest/abartos/storm_logo2.png -- diff --git a/docs/images/logocontest/abartos/storm_logo2.png b/docs/images/logocontest/abartos/storm_logo2.png deleted file mode 100644 index a487d28..000 Binary files a/docs/images/logocontest/abartos/storm_logo2.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/storm/blob/2f5c31d2/docs/images/logocontest/abartos/storm_logo3.png -- diff --git a/docs/images/logocontest/abartos/storm_logo3.png b/docs/images/logocontest/abartos/storm_logo3.png deleted file mode 100644 index ddb4591..000 Binary files a/docs/images/logocontest/abartos/storm_logo3.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/storm/blob/2f5c31d2/docs/images/logocontest/cboustead/storm_logo.png -- diff
[19/36] storm git commit: STORM-1468: remove {master}/docs
http://git-wip-us.apache.org/repos/asf/storm/blob/2f5c31d2/docs/assets/css/bootstrap.css -- diff --git a/docs/assets/css/bootstrap.css b/docs/assets/css/bootstrap.css deleted file mode 100644 index c6f3d21..000 --- a/docs/assets/css/bootstrap.css +++ /dev/null @@ -1,6332 +0,0 @@ -/*! - * Bootstrap v3.3.1 (http://getbootstrap.com) - * Copyright 2011-2014 Twitter, Inc. - * Licensed under MIT (https://github.com/twbs/bootstrap/blob/master/LICENSE) - */ - -/*! normalize.css v3.0.2 | MIT License | git.io/normalize */ -html { - font-family: sans-serif; - -webkit-text-size-adjust: 100%; - -ms-text-size-adjust: 100%; -} -body { - margin: 0; -} -article, -aside, -details, -figcaption, -figure, -footer, -header, -hgroup, -main, -menu, -nav, -section, -summary { - display: block; -} -audio, -canvas, -progress, -video { - display: inline-block; - vertical-align: baseline; -} -audio:not([controls]) { - display: none; - height: 0; -} -[hidden], -template { - display: none; -} -a { - background-color: transparent; -} -a:active, -a:hover { - outline: 0; -} -abbr[title] { - border-bottom: 1px dotted; -} -b, -strong { - font-weight: bold; -} -dfn { - font-style: italic; -} -h1 { - margin: .67em 0; - font-size: 2em; -} -mark { - color: #000; - background: #ff0; -} -small { - font-size: 80%; -} -sub, -sup { - position: relative; - font-size: 75%; - line-height: 0; - vertical-align: baseline; -} -sup { - top: -.5em; -} -sub { - bottom: -.25em; -} -img { - border: 0; -} -svg:not(:root) { - overflow: hidden; -} -figure { - margin: 1em 40px; -} -hr { - height: 0; - -webkit-box-sizing: content-box; - -moz-box-sizing: content-box; - box-sizing: content-box; -} -pre { - overflow: auto; -} -code, -kbd, -pre, -samp { - font-family: monospace, monospace; - font-size: 1em; -} -button, -input, -optgroup, -select, -textarea { - margin: 0; - font: inherit; - color: inherit; -} -button { - overflow: visible; -} -button, -select { - text-transform: none; -} -button, -html input[type="button"], -input[type="reset"], -input[type="submit"] { - -webkit-appearance: button; - cursor: pointer; -} -button[disabled], -html input[disabled] { - cursor: default; -} -button::-moz-focus-inner, -input::-moz-focus-inner { - padding: 0; - border: 0; -} -input { - line-height: normal; -} -input[type="checkbox"], -input[type="radio"] { - -webkit-box-sizing: border-box; - -moz-box-sizing: border-box; - box-sizing: border-box; - padding: 0; -} -input[type="number"]::-webkit-inner-spin-button, -input[type="number"]::-webkit-outer-spin-button { - height: auto; -} -input[type="search"] { - -webkit-box-sizing: content-box; - -moz-box-sizing: content-box; - box-sizing: content-box; - -webkit-appearance: textfield; -} -input[type="search"]::-webkit-search-cancel-button, -input[type="search"]::-webkit-search-decoration { - -webkit-appearance: none; -} -fieldset { - padding: .35em .625em .75em; - margin: 0 2px; - border: 1px solid #c0c0c0; -} -legend { - padding: 0; - border: 0; -} -textarea { - overflow: auto; -} -optgroup { - font-weight: bold; -} -table { - border-spacing: 0; - border-collapse: collapse; -} -td, -th { - padding: 0; -} -/*! Source: https://github.com/h5bp/html5-boilerplate/blob/master/src/css/main.css */ -@media print { - *, - *:before, - *:after { -color: #000 !important; -text-shadow: none !important; -background: transparent !important; --webkit-box-shadow: none !important; -box-shadow: none !important; - } - a, - a:visited { -text-decoration: underline; - } - a[href]:after { -content: " (" attr(href) ")"; - } - abbr[title]:after { -content: " (" attr(title) ")"; - } - a[href^="#"]:after, - a[href^="javascript:"]:after { -content: ""; - } - pre, - blockquote { -border: 1px solid #999; - -page-break-inside: avoid; - } - thead { -display: table-header-group; - } - tr, - img { -page-break-inside: avoid; - } - img { -max-width: 100% !important; - } - p, - h2, - h3 { -orphans: 3; -widows: 3; - } - h2, - h3 { -page-break-after: avoid; - } - select { -background: #fff !important; - } - .navbar { -display: none; - } - .btn > .caret, - .dropup > .btn > .caret { -border-top-color: #000 !important; - } - .label { -border: 1px solid #000; - } - .table { -border-collapse: collapse !important; - } - .table td, - .table th { -background-color: #fff !important; - } - .table-bordered th, - .table-bordered td { -border: 1px solid #ddd !important; - } -} -@font-face { - font-family: 'Glyphicons Halflings'; - - src: url('../fonts/glyphicons-halflings-regular.eot'); - src: url('../fonts/glyphicons-halflings-regular.eot?#iefix') format('embedded-opentype'), url('../fonts/glyphicons-halflings-regular.woff') format('woff'), url('../fonts/glyphi