[GitHub] incubator-gearpump issue #247: [GEARPUMP-377] Add TwitterSource and examples
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/247 +1 ---
[GitHub] incubator-gearpump issue #247: [GEARPUMP-377] Add TwitterSource and examples
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/247 What will happen if the app has multiple source tasks? Will each task have the same output? ---
[GitHub] incubator-gearpump issue #244: [GEARPUMP-373] Don't create hbase configurati...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/244 +1 ---
[GitHub] incubator-gearpump issue #243: [GEARPUMP-339] Fix unknown tag parameter
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/243 +1 ---
[GitHub] incubator-gearpump pull request #227: [GEARPUMP-350] Fix the not started app...
Github user huafengw closed the pull request at: https://github.com/apache/incubator-gearpump/pull/227 ---
[GitHub] incubator-gearpump issue #240: [GEARPUMP-344] Fix broken links in README
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/240 +1 ---
[GitHub] incubator-gearpump issue #239: [GEARPUMP-366] Create JarStore dir if not exi...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/239 +1 ---
[GitHub] incubator-gearpump issue #238: [GEARPUMP-350] Fix app clock not started
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/238 +1 ---
[GitHub] incubator-gearpump issue #233: [GEARPUMP-361] Build gearpump docker image lo...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/233 +1 ---
[GitHub] incubator-gearpump pull request #236: [GEARPUMP-362] Fix bin/gear info shows...
GitHub user huafengw opened a pull request: https://github.com/apache/incubator-gearpump/pull/236 [GEARPUMP-362] Fix bin/gear info shows nothing when there are apps ru⦠â¦nning Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the commit message is formatted like: `[GEARPUMP-] Meaningful description of pull request` - [ ] Make sure tests pass via `sbt clean test`. - [ ] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality. You can merge this pull request into a Git repository by running: $ git pull https://github.com/huafengw/incubator-gearpump info Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-gearpump/pull/236.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #236 commit 65cf99014279198cae9826dc057f79419c035520 Author: huafengw <fvunic...@gmail.com> Date: 2017-10-28T03:25:54Z [GEARPUMP-362] Fix bin/gear info shows nothing when there are apps running ---
[GitHub] incubator-gearpump issue #233: [GEARPUMP-361] Build gearpump docker image lo...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/233 1. I have to build with explicit proxies like `docker build --build-arg HTTP_PROXY=... --build-arg HTTPS_PROXY=...`. If not, building step `fetch http://dl-cdn.alpinelinux.org/alpine/v3.6/main/x86_64/APKINDEX.tar.gz` will always fail. 2. Please add following change in `integrationtest/docker/gearpump/Dockerfile` ``` -RUN apk add --no-cache python && \ +RUN apk add --update curl && \ +apk add --no-cache python && \ ``` 3. After resolved these issues, the integration test still can't proceed. Here is the error log: ``` [DEBUG] [15:21:49.279] [Docker$] Container master0>> curl http://master0:8090/api/v1.0/master [DEBUG] [15:21:49.279] [Docker$] INPUT==>> [DEBUG] [15:21:49.279] [ShellExec$] EXEC master0 => `docker exec master0 curl -s --cookie cookie.txt http://master0:8090/api/v1.0/master` [DEBUG] [15:21:49.636] [ShellExec$] EXEC master0 <= ` Redirection Redirect ` exit 0 [DEBUG] [15:21:49.636] [Docker$] <<==OUTPUT [DEBUG] [15:21:49.636] [Docker$] Redirection Redirect [DEBUG] [15:21:49.637] [Docker$] <<Command END. Container master0, curl http://master0:8090/api/v1.0/master [ERROR] [15:21:49.637] [RestClient] Failed to decode Rest response to MasterData [ERROR] [15:21:49.637] [Util$] Failed due to (false == cluster running), retrying for the 15 times... ``` ---
[GitHub] incubator-gearpump pull request #227: [GEARPUMP-350] Fix the not started app...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/227#discussion_r147065810 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala --- @@ -361,10 +358,11 @@ class TaskActor( } private def getSubscriptionWatermark(subs: List[(Int, Subscription)], wmk: Instant): Instant = { +val wmkInMilli = wmk.toEpochMilli --- End diff -- To avoid multiple `toEpochMilli` calls ---
[GitHub] incubator-gearpump pull request #234: [GEARPUMP-359] Fix OutputWatermark adv...
GitHub user huafengw opened a pull request: https://github.com/apache/incubator-gearpump/pull/234 [GEARPUMP-359] Fix OutputWatermark advancing logic in Subscription Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the commit message is formatted like: `[GEARPUMP-] Meaningful description of pull request` - [ ] Make sure tests pass via `sbt clean test`. - [ ] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality. You can merge this pull request into a Git repository by running: $ git pull https://github.com/huafengw/incubator-gearpump subs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-gearpump/pull/234.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #234 commit 311e09b28ebe8ac8525b447e831615065dcc541c Author: huafengw <fvunic...@gmail.com> Date: 2017-10-26T05:52:39Z [GEARPUMP-359] Fix OutputWatermark advancing logic in Subscription ---
[GitHub] incubator-gearpump issue #233: [WIP] Fix integration test
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/233 Verified on centos, the updated image can be launched successfully. ---
[GitHub] incubator-gearpump pull request #232: [GEARPUMP-358] Decrease the frequency ...
GitHub user huafengw opened a pull request: https://github.com/apache/incubator-gearpump/pull/232 [GEARPUMP-358] Decrease the frequency of watermark calculation Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the commit message is formatted like: `[GEARPUMP-] Meaningful description of pull request` - [ ] Make sure tests pass via `sbt clean test`. - [ ] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality. You can merge this pull request into a Git repository by running: $ git pull https://github.com/huafengw/incubator-gearpump regre Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-gearpump/pull/232.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #232 commit 3456bfcfe1fa643f37dedbb88c6e4090d6df5823 Author: huafengw <fvunic...@gmail.com> Date: 2017-10-18T11:19:44Z [GEARPUMP-358] Decrease the frequency of watermark calculation ---
[GitHub] incubator-gearpump issue #231: [GEARPUMP-355] Fix YarnAppMaster address reso...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/231 Hi @titikakatoo, thanks for your response. I'd like to know how you set up your cluster. I just got a single-node secured yarn cluster but found the command you posted goes well. So how many nodes are there in your cluster and where do you execute the yarnClient command, just one of the node among them? And also, is there any special config I should notice? ---
[GitHub] incubator-gearpump issue #231: [GEARPUMP-355] Fix YarnAppMaster address reso...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/231 Hi @titikakatoo , thanks for your contribution! Can you briefly describe how to reproduce the problem you found? I'm setting up a secured cluster and try to verify your pull request. And BTW, does this pull request depend on #230? ---
[GitHub] incubator-gearpump pull request #227: [GEARPUMP-350] Fix the not started app...
GitHub user huafengw opened a pull request: https://github.com/apache/incubator-gearpump/pull/227 [GEARPUMP-350] Fix the not started app clock Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the commit message is formatted like: `[GEARPUMP-] Meaningful description of pull request` - [ ] Make sure tests pass via `sbt clean test`. - [ ] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality. You can merge this pull request into a Git repository by running: $ git pull https://github.com/huafengw/incubator-gearpump clock Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-gearpump/pull/227.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #227 commit 31c124455d362f853aa122970fd2e8ad925a8976 Author: huafengw <fvunic...@gmail.com> Date: 2017-09-29T08:02:59Z [GEARPUMP-350] Fix the not started app clock ---
[GitHub] incubator-gearpump issue #227: [GEARPUMP-350] Fix the not started app clock
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/227 Verified streaming examples including complex dag, SOL, word count dsl and wordcount java. ---
[GitHub] incubator-gearpump pull request #226: [WIP] Don't use window runner for non-...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/226#discussion_r141637090 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala --- @@ -73,6 +73,25 @@ case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE], } } +class DirectWindowRunner[IN, OUT](fnRunner: FunctionRunner[IN, OUT]) --- End diff -- Maybe change `WindowRunner` to a more generic name? Then `DirectWindowRunner` can be a non-window runner. ---
[GitHub] incubator-gearpump pull request #223: [GEARPUMP-349] Optimize Graph topologi...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/223#discussion_r139345072 --- Diff: core/src/main/scala/org/apache/gearpump/util/Graph.scala --- @@ -318,12 +360,8 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial * http://www.drdobbs.com/database/topological-sorting/184410262 */ def topologicalOrderWithCirclesIterator: Iterator[N] = { -if (hasCycle()) { - val topo = getAcyclicCopy().topologicalOrderIterator - topo.flatMap(_.sortBy(_indexs(_)).iterator) -} else { - topologicalOrderIterator -} +val topo = getAcyclicCopy().topologicalOrderIterator +topo.flatMap(_.sortBy(indexs(_)).iterator) --- End diff -- It's not. `topo` is an `Iterator[List[N]]` so the items of topo is sorted but the item it self is a list and it's not sorted. ---
[GitHub] incubator-gearpump pull request #223: [GEARPUMP-349] Optimize Graph topologi...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/223#discussion_r138880775 --- Diff: core/src/main/scala/org/apache/gearpump/util/Graph.scala --- @@ -81,28 +86,35 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial * out degree */ def outDegreeOf(node: N): Int = { -edges.count(_._1 == node) +outgoingEdgesOf(node).size } /** * in degree */ def inDegreeOf(node: N): Int = { -edges.count(_._3 == node) +incomingEdgesOf(node).size } /** * out going edges. */ def outgoingEdgesOf(node: N): List[(N, E, N)] = { -edges.filter(_._1 == node) +_outEdges.getOrElse(node, List.empty) } /** * incoming edges. */ def incomingEdgesOf(node: N): List[(N, E, N)] = { -edges.filter(_._3 == node) +_inEdges.getOrElse(node, List.empty) + } + + /** + * adjacent vertices. + */ + def adjacentVertices(node: N): List[N] = { --- End diff -- It's a directed graph ---
[GitHub] incubator-gearpump pull request #223: [GEARPUMP-349] Optimize Graph topologi...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/223#discussion_r138881132 --- Diff: core/src/main/scala/org/apache/gearpump/util/Graph.scala --- @@ -243,13 +259,34 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial * The node returned by Iterator is stable sorted. */ def topologicalOrderIterator: Iterator[N] = { -val newGraph = copy -var output = List.empty[N] - -while (!newGraph.isEmpty) { - output ++= newGraph.removeZeroInDegree +tryTopologicalOrderIterator.get --- End diff -- I think exception is better than dead loop ---
[GitHub] incubator-gearpump pull request #223: [GEARPUMP-349] Optimize Graph topologi...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/223#discussion_r138880939 --- Diff: core/src/main/scala/org/apache/gearpump/util/Graph.scala --- @@ -165,7 +181,7 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial * edges connected to node */ def edgesOf(node: N): List[(N, E, N)] = { -(incomingEdgesOf(node) ++ outgoingEdgesOf(node)).toSet[(N, E, N)].toList.sortBy(_indexs(_)) +(incomingEdgesOf(node) ++ outgoingEdgesOf(node)).distinct.sortBy(_indexs(_)) --- End diff -- ``` // This is used to ensure the output of this Graph is always stable // Like method vertices(), or edges() private var _indexs = Map.empty[Any, Int] ---
[GitHub] incubator-gearpump pull request #223: [GEARPUMP-349] Optimize Graph topologi...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/223#discussion_r138882103 --- Diff: core/src/main/scala/org/apache/gearpump/util/Graph.scala --- @@ -355,19 +393,7 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial * check whether there is a loop */ def hasCycle(): Boolean = { -@tailrec -def detectCycle(graph: Graph[N, E]): Boolean = { - if (graph.edges.isEmpty) { -false - } else if (graph.vertices.nonEmpty && !graph.vertices.exists(graph.inDegreeOf(_) == 0)) { -true - } else { -graph.removeZeroInDegree -detectCycle(graph) - } -} - -detectCycle(copy) +tryTopologicalOrderIterator.isFailure --- End diff -- The graph is mutable so we have to check whether the Graph is changed if caching the result. ---
[GitHub] incubator-gearpump pull request #223: [GEARPUMP-349] Optimize Graph topologi...
GitHub user huafengw opened a pull request: https://github.com/apache/incubator-gearpump/pull/223 [GEARPUMP-349] Optimize Graph topologicalOrderIterator performance Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the commit message is formatted like: `[GEARPUMP-] Meaningful description of pull request` - [ ] Make sure tests pass via `sbt clean test`. - [ ] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality. You can merge this pull request into a Git repository by running: $ git pull https://github.com/huafengw/incubator-gearpump graph Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-gearpump/pull/223.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #223 commit 2673c99cb26a8a0733af772f01748f818ea08857 Author: huafengw <fvunic...@gmail.com> Date: 2017-09-05T08:59:49Z [GEARPUMP-349] Optimize Graph topologicalOrderIterator performance ---
[GitHub] incubator-gearpump pull request #222: [GEARPUMP-348] Allow application total...
GitHub user huafengw opened a pull request: https://github.com/apache/incubator-gearpump/pull/222 [GEARPUMP-348] Allow application total number of retries to be config⦠â¦urable Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the commit message is formatted like: `[GEARPUMP-] Meaningful description of pull request` - [ ] Make sure tests pass via `sbt clean test`. - [ ] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality. You can merge this pull request into a Git repository by running: $ git pull https://github.com/huafengw/incubator-gearpump restart Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-gearpump/pull/222.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #222 commit 1ff5ac05904320fd64580529897822524edb349a Author: huafengw <fvunic...@gmail.com> Date: 2017-09-01T06:21:19Z [GEARPUMP-348] Allow application total number of retries to be configurable --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #221: [GEARPUMP-217] Merge sql branch into master
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/221 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #215: [GEARPUMP-343] Fix typo of EmbeddedRuntimeEnv...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/215 ``` The job exceeded the maximum time limit for jobs, and has been terminated. ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #215: [GEARPUMP-343] Fix typo of EmbeddedRun...
GitHub user huafengw opened a pull request: https://github.com/apache/incubator-gearpump/pull/215 [GEARPUMP-343] Fix typo of EmbeddedRuntimeEnvironment Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the commit message is formatted like: `[GEARPUMP-] Meaningful description of pull request` - [ ] Make sure tests pass via `sbt clean test`. - [ ] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality. You can merge this pull request into a Git repository by running: $ git pull https://github.com/huafengw/incubator-gearpump minor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-gearpump/pull/215.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #215 commit c6b11b56deb4cc16f3aa96013b453a7f70e646e6 Author: huafengw <fvunic...@gmail.com> Date: 2017-08-17T02:37:47Z [GEARPUMP-343] Fix typo of EmbeddedRuntimeEnvironment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #214: [GEARPUMP-341] Update processing watermark in...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/214 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #213: [GEARPUMP-339] Add ScalaDoc to window api and...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/213 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #213: [GEARPUMP-339] Add ScalaDoc to window ...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/213#discussion_r132863041 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala --- @@ -137,11 +156,14 @@ class DefaultWindowRunner[IN, OUT]( } onTrigger(outputs, newWmk) } else { - // minimum of end of last triggered window and start of first un-triggered window + // The output watermark is the minimum of end of last triggered window + // and start of first un-triggered window TriggeredOutputs(outputs, TaskUtil.min(wmk, firstWin.startTime)) } } else { +// All windows have been triggered. if (time == Watermark.MAX) { + // This means there will no more inputs so it's safe to advance to the maximum watermark. --- End diff -- there will be no more --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #213: [GEARPUMP-339] Add ScalaDoc to window api and...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/213 No unit test failed, so where does the failure come from? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #212: [GEARPUMP-339] Add ScalaDoc to Streami...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/212#discussion_r131545040 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala --- @@ -62,6 +62,17 @@ class StreamApp( val dag = planner.plan(graph) StreamApplication(name, dag, userConfig) } + + def source[T](dataSource: DataSource, parallelism: Int = 1, + conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = { +implicit val sourceOp = DataSourceOp(dataSource, parallelism, description, conf) --- End diff -- Has to be implicit? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #212: [GEARPUMP-339] Add ScalaDoc to Streaming DSL
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/212 Generally LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #210: [GEARPUMP-339] Fix unknown tag parameter
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/210 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #208: [GEARPUMP-338] Improve time related types and...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/208 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #207: [GEARPUMP-336] Add scalac options and fix war...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/207 LGTM, is the failure related? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #205: [GEARPUMP-335] Set kafka tests log level to E...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/205 Good, please go ahead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #204: [GEARPUMP-334] Fix Java WordCount DSL example
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/204 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #205: [GEARPUMP-335] Set kafka tests log level to W...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/205 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #203: [GEARPUMP-333] Fix examples assembling
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/203 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #203: [GEARPUMP-333] Fix examples assembling
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/203 It's a random failure --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #202: [GEARPUMP-331] Allow applications to b...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/202#discussion_r130015303 --- Diff: docs/contents/dev/dev-write-1st-app.md --- @@ -9,7 +9,7 @@ We'll use the classical [wordcount](https://github.com/apache/incubator-gearpump override val options: Array[(String, CLIOption[Any])] = Array.empty override def main(akkaConf: Config, args: Array[String]): Unit = { - val context = ClientContext(akkaConf) + val context = RuntimeEnvironment.get().newClientContext(akkaConf) --- End diff -- ``` def apply(config: Config): ClientContext = { RuntimeEnvironment.get().newClientContext(config) } ``` Like this? Then what about ``` def apply(config: Config, system: ActorSystem, master: ActorRef): ClientContext ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #202: [GEARPUMP-331] Allow applications to b...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/202#discussion_r130030700 --- Diff: docs/contents/dev/dev-write-1st-app.md --- @@ -9,7 +9,7 @@ We'll use the classical [wordcount](https://github.com/apache/incubator-gearpump override val options: Array[(String, CLIOption[Any])] = Array.empty override def main(akkaConf: Config, args: Array[String]): Unit = { - val context = ClientContext(akkaConf) + val context = RuntimeEnvironment.get().newClientContext(akkaConf) --- End diff -- Which interface do you mean? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #202: [GEARPUMP-331] Allow applications can ...
GitHub user huafengw opened a pull request: https://github.com/apache/incubator-gearpump/pull/202 [GEARPUMP-331] Allow applications can be ran in IDE Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the commit message is formatted like: `[GEARPUMP-] Meaningful description of pull request` - [ ] Make sure tests pass via `sbt clean test`. - [ ] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality. You can merge this pull request into a Git repository by running: $ git pull https://github.com/huafengw/incubator-gearpump GP331 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-gearpump/pull/202.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #202 commit c95a826fa24375cafeac009b1388c8f5ab3d3792 Author: huafengw <fvunic...@gmail.com> Date: 2017-07-27T05:09:28Z [GEARPUMP-331] Allow applications can be ran in IDE --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #200: [GEARPUMP-330] Allow examples to run in `sbt ...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/200 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #200: [GEARPUMP-330] Allow examples to run in `sbt ...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/200 Document update? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #197: [GEARPUMP-327] Put jarstore and logs under "g...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/197 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #196: [GEARPUMP-322] Fix multiple SLF4J bind...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/196#discussion_r128453676 --- Diff: project/BuildExperiments.scala --- @@ -46,7 +46,8 @@ object BuildExperiments extends sbt.Build { "org.apache.hadoop" % "hadoop-mapreduce-client-core" % hadoopVersion, "org.apache.hadoop" % "hadoop-yarn-server-resourcemanager" % hadoopVersion % "provided", "org.apache.hadoop" % "hadoop-yarn-server-nodemanager" % hadoopVersion % "provided" -) +).map(_.exclude("org.slf4j", "slf4j-api")) --- End diff -- From gearpump-core --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #194: [GEARPUMP-325] Fix license issues
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/194 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #193: Bump up version to 0.8.5-SNAPSHOT
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/193 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #192: [GEARPUMP-320] Handle stashed messages after ...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/192 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/190#discussion_r123429407 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import java.io._ +import java.time.Instant +import java.util +import java.util.Map + +import com.google.common.collect.Table +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.refactor.coder._ +import org.apache.gearpump.streaming.refactor.state.api.StateInternals +import org.apache.gearpump.streaming.refactor.state.heap.HeapStateInternalsFactory +import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig} +import org.apache.gearpump.streaming.task.{Task, TaskContext, UpdateCheckpointClock} +import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory +import org.apache.gearpump.util.LogUtil +import org.apache.gearpump.{Message, TimeStamp} + +object StatefulTask { + val LOG = LogUtil.getLogger(getClass) +} + +/** + * stateful task that support state access and all state will be backed in memory + * after checkpoint state will persist into storage layer and it will guarantee + * 'exactly-once' process semantic + * + */ +abstract class StatefulTask(taskContext: TaskContext, conf: UserConfig) + extends Task(taskContext, conf) { + + import taskContext._ + + val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory]( +PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get + val checkpointStore = checkpointStoreFactory.getCheckpointStore( +s"app$appId-task${taskId.processorId}_${taskId.index}") + val checkpointInterval = conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get + val checkpointManager = new CheckpointManager(checkpointInterval, checkpointStore) + + var inited = false + + // core state data + var encodedKeyStateMap: Map[String, Table[String, String, Array[Byte]]] = null + + /** + * subclass should override this method if they want to init state api + */ + def open: Unit = {} + + /** + * Subclass should override this method to specify how a new message should update state + */ + def invoke(message: Message): Unit + + def close: Unit = {} + + def getStateInternals[KT](keyCoder: Coder[KT], key: KT): StateInternals = { +if (!inited) { + throw new RuntimeException(" please init state access object in `open` method! ") +} +if (encodedKeyStateMap == null) { + encodedKeyStateMap = new util.HashMap[String, Table[String, String, Array[Byte]]]() +} + +val factory = new HeapStateInternalsFactory[KT](keyCoder, encodedKeyStateMap) +factory.stateInternalsForKey(key) + } + + final override def onStart(startTime: Instant): Unit = { +// recover state from snapshot +LOG.info("[onStart] - recover from snapshot") +val timestamp = startTime.toEpochMilli +checkpointManager + .recover(timestamp) + .foreach(recoverState(timestamp, _)) +reportCheckpointClock(timestamp) +inited = true +open + } + + final override def onNext(message: Message): Unit = { +checkpointManager.update(message.timestamp.toEpochMilli) +invoke(message) + } + + final override def onWatermarkProgress(watermark: Instant): Unit = { +if (checkpointManager.shouldCheckpoint(watermark.toEpochMilli)) { + checkpointManager.getCheckpointTime.foreach { checkpointTime => +val serialized = snapshot +checkpointManager.checkpoint(checkpointTime, serializ
[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/190#discussion_r123429421 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import java.io._ +import java.time.Instant +import java.util +import java.util.Map + +import com.google.common.collect.Table +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.refactor.coder._ +import org.apache.gearpump.streaming.refactor.state.api.StateInternals +import org.apache.gearpump.streaming.refactor.state.heap.HeapStateInternalsFactory +import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig} +import org.apache.gearpump.streaming.task.{Task, TaskContext, UpdateCheckpointClock} +import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory +import org.apache.gearpump.util.LogUtil +import org.apache.gearpump.{Message, TimeStamp} + +object StatefulTask { + val LOG = LogUtil.getLogger(getClass) +} + +/** + * stateful task that support state access and all state will be backed in memory + * after checkpoint state will persist into storage layer and it will guarantee + * 'exactly-once' process semantic + * + */ +abstract class StatefulTask(taskContext: TaskContext, conf: UserConfig) + extends Task(taskContext, conf) { + + import taskContext._ + + val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory]( +PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get + val checkpointStore = checkpointStoreFactory.getCheckpointStore( +s"app$appId-task${taskId.processorId}_${taskId.index}") + val checkpointInterval = conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get + val checkpointManager = new CheckpointManager(checkpointInterval, checkpointStore) + + var inited = false + + // core state data + var encodedKeyStateMap: Map[String, Table[String, String, Array[Byte]]] = null + + /** + * subclass should override this method if they want to init state api + */ + def open: Unit = {} + + /** + * Subclass should override this method to specify how a new message should update state + */ + def invoke(message: Message): Unit + + def close: Unit = {} + + def getStateInternals[KT](keyCoder: Coder[KT], key: KT): StateInternals = { +if (!inited) { + throw new RuntimeException(" please init state access object in `open` method! ") +} +if (encodedKeyStateMap == null) { + encodedKeyStateMap = new util.HashMap[String, Table[String, String, Array[Byte]]]() +} + +val factory = new HeapStateInternalsFactory[KT](keyCoder, encodedKeyStateMap) +factory.stateInternalsForKey(key) + } + + final override def onStart(startTime: Instant): Unit = { +// recover state from snapshot +LOG.info("[onStart] - recover from snapshot") +val timestamp = startTime.toEpochMilli +checkpointManager + .recover(timestamp) + .foreach(recoverState(timestamp, _)) +reportCheckpointClock(timestamp) +inited = true +open + } + + final override def onNext(message: Message): Unit = { +checkpointManager.update(message.timestamp.toEpochMilli) +invoke(message) + } + + final override def onWatermarkProgress(watermark: Instant): Unit = { +if (checkpointManager.shouldCheckpoint(watermark.toEpochMilli)) { + checkpointManager.getCheckpointTime.foreach { checkpointTime => +val serialized = snapshot +checkpointManager.checkpoint(checkpointTime, serializ
[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/190#discussion_r123428717 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import java.util.Objects + +import org.apache.gearpump.streaming.refactor.coder.Coder +import org.apache.gearpump.streaming.refactor.state.api.{BagState, MapState, SetState, ValueState} + +/** + * a utility object for StateSpec + */ +object StateSpecs { + + private class ValueStateSpec[T](coder: Coder[T]) extends StateSpec[ValueState[T]] { + +var aCoder: Coder[T] = coder + +override def bind(id: String, binder: StateBinder): ValueState[T] = { + binder.bindValue(id, this, aCoder) +} + +override def offerCoders(coders: Array[Coder[ValueState[T]]]): Unit = { + if (this.aCoder == null) { +if (coders(0) != null) { + this.aCoder = coders(0).asInstanceOf[Coder[T]] +} + } +} + +override def finishSpecifying: Unit = { + if (aCoder == null) throw new IllegalStateException( +"Unable to infer a coder for ValueState and no Coder" ++ " was specified. Please set a coder by either invoking" ++ " StateSpecs.value(Coder valueCoder) or by registering the coder in the" ++ " Pipeline's CoderRegistry.") +} + +override def equals(obj: Any): Boolean = { + if (obj == this) true --- End diff -- It's not right in Scala, here without `return`, `true` just becomes a useless statement, which means eventually the last line `Objects.equals(this.aCoder, that.aCoder)` will be executed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/190#discussion_r123428767 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import java.util.Objects + +import org.apache.gearpump.streaming.refactor.coder.Coder +import org.apache.gearpump.streaming.refactor.state.api.{BagState, MapState, SetState, ValueState} + +/** + * a utility object for StateSpec + */ +object StateSpecs { + + private class ValueStateSpec[T](coder: Coder[T]) extends StateSpec[ValueState[T]] { + +var aCoder: Coder[T] = coder + +override def bind(id: String, binder: StateBinder): ValueState[T] = { + binder.bindValue(id, this, aCoder) +} + +override def offerCoders(coders: Array[Coder[ValueState[T]]]): Unit = { + if (this.aCoder == null) { +if (coders(0) != null) { + this.aCoder = coders(0).asInstanceOf[Coder[T]] +} + } +} + +override def finishSpecifying: Unit = { + if (aCoder == null) throw new IllegalStateException( +"Unable to infer a coder for ValueState and no Coder" ++ " was specified. Please set a coder by either invoking" ++ " StateSpecs.value(Coder valueCoder) or by registering the coder in the" ++ " Pipeline's CoderRegistry.") +} + +override def equals(obj: Any): Boolean = { + if (obj == this) true + + if (!(obj.isInstanceOf[ValueStateSpec[T]])) false + + val that: ValueStateSpec[_] = obj.asInstanceOf[ValueStateSpec[_]] + Objects.equals(this.aCoder, that.aCoder) +} + +override def hashCode(): Int = { + Objects.hashCode(this.aCoder) +} + } + + private class BagStateSpec[T](coder: Coder[T]) extends StateSpec[BagState[T]] { + +private implicit var elemCoder = coder + +override def bind(id: String, binder: StateBinder): BagState[T] = + binder.bindBag(id, this, elemCoder) + +override def offerCoders(coders: Array[Coder[BagState[T]]]): Unit = { + if (this.elemCoder == null) { +if (coders(0) != null) { + this.elemCoder = coders(0).asInstanceOf[Coder[T]] +} + } +} + +override def finishSpecifying: Unit = { + if (elemCoder == null) { +throw new IllegalStateException("Unable to infer a coder for BagState and no Coder" + + " was specified. Please set a coder by either invoking" + + " StateSpecs.bag(Coder elemCoder) or by registering the coder in the" + + " Pipeline's CoderRegistry."); + } +} + +override def equals(obj: Any): Boolean = { + if (obj == this) true + + if (!obj.isInstanceOf[BagStateSpec[_]]) false + + val that = obj.asInstanceOf[BagStateSpec[_]] + Objects.equals(this.elemCoder, that.elemCoder) +} + +override def hashCode(): Int = Objects.hash(getClass, elemCoder) + } + + private class MapStateSpec[K, V](keyCoder: Coder[K], valueCoder: Coder[V]) +extends StateSpec[MapState[K, V]] { + +private implicit var kCoder = keyCoder +private implicit var vCoder = valueCoder + +override def bind(id: String, binder: StateBinder): MapState[K, V] = + binder.bindMap(id, this, keyCoder, valueCoder) + +override def offerCoders(coders: Array[Coder[MapState[K, V]]]): Unit = { + if (this.kCoder == null) { +if (coders(0) != null) { + this.kCoder = coders(0).asInstanceOf[Coder[K]] +} + } + + if
[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/190#discussion_r123428739 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import java.util.Objects + +import org.apache.gearpump.streaming.refactor.coder.Coder +import org.apache.gearpump.streaming.refactor.state.api.{BagState, MapState, SetState, ValueState} + +/** + * a utility object for StateSpec + */ +object StateSpecs { + + private class ValueStateSpec[T](coder: Coder[T]) extends StateSpec[ValueState[T]] { + +var aCoder: Coder[T] = coder + +override def bind(id: String, binder: StateBinder): ValueState[T] = { + binder.bindValue(id, this, aCoder) +} + +override def offerCoders(coders: Array[Coder[ValueState[T]]]): Unit = { + if (this.aCoder == null) { +if (coders(0) != null) { + this.aCoder = coders(0).asInstanceOf[Coder[T]] +} + } +} + +override def finishSpecifying: Unit = { + if (aCoder == null) throw new IllegalStateException( +"Unable to infer a coder for ValueState and no Coder" ++ " was specified. Please set a coder by either invoking" ++ " StateSpecs.value(Coder valueCoder) or by registering the coder in the" ++ " Pipeline's CoderRegistry.") +} + +override def equals(obj: Any): Boolean = { + if (obj == this) true + + if (!(obj.isInstanceOf[ValueStateSpec[T]])) false + + val that: ValueStateSpec[_] = obj.asInstanceOf[ValueStateSpec[_]] + Objects.equals(this.aCoder, that.aCoder) +} + +override def hashCode(): Int = { + Objects.hashCode(this.aCoder) +} + } + + private class BagStateSpec[T](coder: Coder[T]) extends StateSpec[BagState[T]] { + +private implicit var elemCoder = coder + +override def bind(id: String, binder: StateBinder): BagState[T] = + binder.bindBag(id, this, elemCoder) + +override def offerCoders(coders: Array[Coder[BagState[T]]]): Unit = { + if (this.elemCoder == null) { +if (coders(0) != null) { + this.elemCoder = coders(0).asInstanceOf[Coder[T]] +} + } +} + +override def finishSpecifying: Unit = { + if (elemCoder == null) { +throw new IllegalStateException("Unable to infer a coder for BagState and no Coder" + + " was specified. Please set a coder by either invoking" + + " StateSpecs.bag(Coder elemCoder) or by registering the coder in the" + + " Pipeline's CoderRegistry."); + } +} + +override def equals(obj: Any): Boolean = { + if (obj == this) true --- End diff -- The same --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/190#discussion_r123428752 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import java.util.Objects + +import org.apache.gearpump.streaming.refactor.coder.Coder +import org.apache.gearpump.streaming.refactor.state.api.{BagState, MapState, SetState, ValueState} + +/** + * a utility object for StateSpec + */ +object StateSpecs { + + private class ValueStateSpec[T](coder: Coder[T]) extends StateSpec[ValueState[T]] { + +var aCoder: Coder[T] = coder + +override def bind(id: String, binder: StateBinder): ValueState[T] = { + binder.bindValue(id, this, aCoder) +} + +override def offerCoders(coders: Array[Coder[ValueState[T]]]): Unit = { + if (this.aCoder == null) { +if (coders(0) != null) { + this.aCoder = coders(0).asInstanceOf[Coder[T]] +} + } +} + +override def finishSpecifying: Unit = { + if (aCoder == null) throw new IllegalStateException( +"Unable to infer a coder for ValueState and no Coder" ++ " was specified. Please set a coder by either invoking" ++ " StateSpecs.value(Coder valueCoder) or by registering the coder in the" ++ " Pipeline's CoderRegistry.") +} + +override def equals(obj: Any): Boolean = { + if (obj == this) true + + if (!(obj.isInstanceOf[ValueStateSpec[T]])) false + + val that: ValueStateSpec[_] = obj.asInstanceOf[ValueStateSpec[_]] + Objects.equals(this.aCoder, that.aCoder) +} + +override def hashCode(): Int = { + Objects.hashCode(this.aCoder) +} + } + + private class BagStateSpec[T](coder: Coder[T]) extends StateSpec[BagState[T]] { + +private implicit var elemCoder = coder + +override def bind(id: String, binder: StateBinder): BagState[T] = + binder.bindBag(id, this, elemCoder) + +override def offerCoders(coders: Array[Coder[BagState[T]]]): Unit = { + if (this.elemCoder == null) { +if (coders(0) != null) { + this.elemCoder = coders(0).asInstanceOf[Coder[T]] +} + } +} + +override def finishSpecifying: Unit = { + if (elemCoder == null) { +throw new IllegalStateException("Unable to infer a coder for BagState and no Coder" + + " was specified. Please set a coder by either invoking" + + " StateSpecs.bag(Coder elemCoder) or by registering the coder in the" + + " Pipeline's CoderRegistry."); + } +} + +override def equals(obj: Any): Boolean = { + if (obj == this) true + + if (!obj.isInstanceOf[BagStateSpec[_]]) false + + val that = obj.asInstanceOf[BagStateSpec[_]] + Objects.equals(this.elemCoder, that.elemCoder) +} + +override def hashCode(): Int = Objects.hash(getClass, elemCoder) + } + + private class MapStateSpec[K, V](keyCoder: Coder[K], valueCoder: Coder[V]) +extends StateSpec[MapState[K, V]] { + +private implicit var kCoder = keyCoder +private implicit var vCoder = valueCoder + +override def bind(id: String, binder: StateBinder): MapState[K, V] = + binder.bindMap(id, this, keyCoder, valueCoder) + +override def offerCoders(coders: Array[Coder[MapState[K, V]]]): Unit = { + if (this.kCoder == null) { +if (coders(0) != null) { + this.kCoder = coders(0).asInstanceOf[Coder[K]] +} + } + + if
[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/190#discussion_r123429442 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import java.io._ +import java.time.Instant +import java.util +import java.util.Map + +import com.google.common.collect.Table +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.refactor.coder._ +import org.apache.gearpump.streaming.refactor.state.api.StateInternals +import org.apache.gearpump.streaming.refactor.state.heap.HeapStateInternalsFactory +import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig} +import org.apache.gearpump.streaming.task.{Task, TaskContext, UpdateCheckpointClock} +import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory +import org.apache.gearpump.util.LogUtil +import org.apache.gearpump.{Message, TimeStamp} + +object StatefulTask { + val LOG = LogUtil.getLogger(getClass) +} + +/** + * stateful task that support state access and all state will be backed in memory + * after checkpoint state will persist into storage layer and it will guarantee + * 'exactly-once' process semantic + * + */ +abstract class StatefulTask(taskContext: TaskContext, conf: UserConfig) + extends Task(taskContext, conf) { + + import taskContext._ + + val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory]( +PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get + val checkpointStore = checkpointStoreFactory.getCheckpointStore( +s"app$appId-task${taskId.processorId}_${taskId.index}") + val checkpointInterval = conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get + val checkpointManager = new CheckpointManager(checkpointInterval, checkpointStore) + + var inited = false + + // core state data + var encodedKeyStateMap: Map[String, Table[String, String, Array[Byte]]] = null + + /** + * subclass should override this method if they want to init state api + */ + def open: Unit = {} + + /** + * Subclass should override this method to specify how a new message should update state + */ + def invoke(message: Message): Unit + + def close: Unit = {} + + def getStateInternals[KT](keyCoder: Coder[KT], key: KT): StateInternals = { +if (!inited) { + throw new RuntimeException(" please init state access object in `open` method! ") +} +if (encodedKeyStateMap == null) { + encodedKeyStateMap = new util.HashMap[String, Table[String, String, Array[Byte]]]() +} + +val factory = new HeapStateInternalsFactory[KT](keyCoder, encodedKeyStateMap) +factory.stateInternalsForKey(key) + } + + final override def onStart(startTime: Instant): Unit = { +// recover state from snapshot +LOG.info("[onStart] - recover from snapshot") +val timestamp = startTime.toEpochMilli +checkpointManager + .recover(timestamp) + .foreach(recoverState(timestamp, _)) +reportCheckpointClock(timestamp) +inited = true +open + } + + final override def onNext(message: Message): Unit = { +checkpointManager.update(message.timestamp.toEpochMilli) +invoke(message) + } + + final override def onWatermarkProgress(watermark: Instant): Unit = { +if (checkpointManager.shouldCheckpoint(watermark.toEpochMilli)) { + checkpointManager.getCheckpointTime.foreach { checkpointTime => +val serialized = snapshot +checkpointManager.checkpoint(checkpointTime, serializ
[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/190#discussion_r123430323 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternals.scala --- @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state.heap + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream} +import java.lang.Iterable +import java.util +import java.util.Map.Entry +import java.util._ +import java.util.Objects + +import com.google.common.collect.Table +import org.apache.gearpump.streaming.refactor.coder.{Coder, ListCoder, MapCoder, SetCoder} +import org.apache.gearpump.streaming.refactor.state.{StateBinder, StateNamespace, StateSpec, StateTag} +import org.apache.gearpump.streaming.refactor.state.api._ +import org.apache.gearpump.util.LogUtil + +/** + * a heap memory backend StateInternals implementation + * it will cache state in heap memory with a Guava's Table structure + * there are three concept : + * (1) namespace mapping table's row id + * (2) tag mapping table's field + * (3) state mapping table's cell value + */ +class HeapStateInternals[K](key: K, stateTable: Table[String, String, Array[Byte]]) + extends StateInternals { + + val LOG = LogUtil.getLogger(getClass) + + private val k: K = key --- End diff -- redundant --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/190#discussion_r123429491 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import java.io._ +import java.time.Instant +import java.util +import java.util.Map + +import com.google.common.collect.Table +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.refactor.coder._ +import org.apache.gearpump.streaming.refactor.state.api.StateInternals +import org.apache.gearpump.streaming.refactor.state.heap.HeapStateInternalsFactory +import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig} +import org.apache.gearpump.streaming.task.{Task, TaskContext, UpdateCheckpointClock} +import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory +import org.apache.gearpump.util.LogUtil +import org.apache.gearpump.{Message, TimeStamp} + +object StatefulTask { + val LOG = LogUtil.getLogger(getClass) +} + +/** + * stateful task that support state access and all state will be backed in memory + * after checkpoint state will persist into storage layer and it will guarantee + * 'exactly-once' process semantic + * + */ +abstract class StatefulTask(taskContext: TaskContext, conf: UserConfig) + extends Task(taskContext, conf) { + + import taskContext._ + + val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory]( +PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get + val checkpointStore = checkpointStoreFactory.getCheckpointStore( +s"app$appId-task${taskId.processorId}_${taskId.index}") + val checkpointInterval = conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get + val checkpointManager = new CheckpointManager(checkpointInterval, checkpointStore) + + var inited = false + + // core state data + var encodedKeyStateMap: Map[String, Table[String, String, Array[Byte]]] = null + + /** + * subclass should override this method if they want to init state api + */ + def open: Unit = {} + + /** + * Subclass should override this method to specify how a new message should update state + */ + def invoke(message: Message): Unit + + def close: Unit = {} + + def getStateInternals[KT](keyCoder: Coder[KT], key: KT): StateInternals = { +if (!inited) { + throw new RuntimeException(" please init state access object in `open` method! ") +} +if (encodedKeyStateMap == null) { + encodedKeyStateMap = new util.HashMap[String, Table[String, String, Array[Byte]]]() +} + +val factory = new HeapStateInternalsFactory[KT](keyCoder, encodedKeyStateMap) +factory.stateInternalsForKey(key) + } + + final override def onStart(startTime: Instant): Unit = { +// recover state from snapshot +LOG.info("[onStart] - recover from snapshot") +val timestamp = startTime.toEpochMilli +checkpointManager + .recover(timestamp) + .foreach(recoverState(timestamp, _)) +reportCheckpointClock(timestamp) +inited = true +open + } + + final override def onNext(message: Message): Unit = { +checkpointManager.update(message.timestamp.toEpochMilli) +invoke(message) + } + + final override def onWatermarkProgress(watermark: Instant): Unit = { +if (checkpointManager.shouldCheckpoint(watermark.toEpochMilli)) { + checkpointManager.getCheckpointTime.foreach { checkpointTime => +val serialized = snapshot +checkpointManager.checkpoint(checkpointTime, serializ
[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/190#discussion_r123430243 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/InMemoryGlobalStateInternals.scala --- @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import java.util +import java.util.Map.Entry +import java.util.{ArrayList, HashSet, List, Set} +import java.lang.Iterable + +import com.google.common.collect.{HashBasedTable, Table} +import org.apache.gearpump.streaming.refactor.coder.Coder +import org.apache.gearpump.streaming.refactor.state.InMemoryGlobalStateInternals.InMemoryStateBinder +import org.apache.gearpump.streaming.refactor.state.api._ + +/** + * a no namespace (global) in memory state internal. + * it is a default implementation inspired by Apache Beam. + * but do not recommend use it! + */ +class InMemoryGlobalStateInternals[K] protected(key: K) extends StateInternals { + + private val k: K = key --- End diff -- Look like `k` is redundant since you already have `key` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #189: [Gearpump 311] refactor state management
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/189 Hi @yanghua , thanks for your contribution! This is a huge pull request so I think it would take a while to fully review this one, and also, can you give some description of this pull request about it's mainly idea or design, which would be great help, thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #188: [GEARPUMP-318] Exclude maven-plugin-api depen...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/188 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #187: [GEARPUMP-317] Fix Task minClock
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/187#discussion_r122588303 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala --- @@ -118,25 +126,34 @@ class DefaultWindowRunner[IN, OUT]( } }) fnRunner.finish().foreach { -out: OUT => outputs += TimestampedValue(out, firstWin.endTime.minusMillis(1)) +out: OUT => + outputs += TimestampedValue(out, firstWin.endTime.minusMillis(1)) } + val newWmk = TaskUtil.max(wmk, firstWin.endTime) if (windows.accumulationMode == Discarding) { fnRunner.teardown() setup = false // discarding, setup need to be called for each window -onTrigger(outputs) +onTrigger(outputs, newWmk) --- End diff -- Move `onTrigger` outside of the `if else` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #187: [GEARPUMP-317] Fix Task minClock
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/187#discussion_r122588434 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala --- @@ -40,7 +41,7 @@ class Subscription( executorId: Int, taskId: TaskId, subscriber: Subscriber, sessionId: Int, -transport: ExpressTransport, +sender: TaskActor, --- End diff -- I think here `sender` is just the current task actor, while literally sender means the upstream task actor --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #186: [GEARPUMP-316] Decouple groupBy from w...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/186#discussion_r120526897 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala --- @@ -32,35 +32,34 @@ object WindowFunction { } } -trait WindowFunction[T] { +trait WindowFunction { - def apply(context: WindowFunction.Context[T]): Array[Window] + def apply[T](context: WindowFunction.Context[T]): Array[Window] def isNonMerging: Boolean } -abstract class NonMergingWindowFunction[T] extends WindowFunction[T] { +abstract class NonMergingWindowFunction extends WindowFunction { override def isNonMerging: Boolean = true } -case class GlobalWindowFunction[T]() extends NonMergingWindowFunction[T] { +case class GlobalWindowFunction() extends NonMergingWindowFunction { - override def apply(context: WindowFunction.Context[T]): Array[Window] = { + override def apply[T](context: WindowFunction.Context[T]): Array[Window] = { Array(Window(Instant.ofEpochMilli(MIN_TIME_MILLIS), --- End diff -- We can create just one static object, no need to create one each time as it's immutable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #186: [GEARPUMP-316] Decouple groupBy from window
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/186 First I got an irrelevant comment, there is a `org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner`, which looks like in a wrong place. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #186: [GEARPUMP-316] Decouple groupBy from w...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/186#discussion_r120526614 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.task + +import java.time.Instant +import java.util.function.Consumer + +import com.gs.collections.impl.map.mutable.UnifiedMap +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Constants.{GEARPUMP_STREAMING_GROUPBY_FUNCTION, GEARPUMP_STREAMING_OPERATOR} +import org.apache.gearpump.streaming.dsl.window.impl.WindowRunner +import org.apache.gearpump.streaming.task.{Task, TaskContext} + +/** + * Processes messages in groups as defined by groupBy function. + */ +class GroupByTask[IN, GROUP, OUT]( +groupBy: IN => GROUP, +taskContext: TaskContext, +userConfig: UserConfig) extends Task(taskContext, userConfig) { + + def this(context: TaskContext, conf: UserConfig) = { +this( + conf.getValue[IN => GROUP](GEARPUMP_STREAMING_GROUPBY_FUNCTION)(context.system).get, + context, conf +) + } + + private val groups: UnifiedMap[GROUP, WindowRunner[IN, OUT]] = +new UnifiedMap[GROUP, WindowRunner[IN, OUT]] + + override def onNext(message: Message): Unit = { +val input = message.value.asInstanceOf[IN] +val group = groupBy(input) + +if (!groups.containsKey(group)) { + groups.put(group, +userConfig.getValue[WindowRunner[IN, OUT]]( + GEARPUMP_STREAMING_OPERATOR)(taskContext.system).get) +} + +groups.get(group).process(input, message.timestamp) + } + + override def onWatermarkProgress(watermark: Instant): Unit = { +groups.values.forEach(new Consumer[WindowRunner[IN, OUT]] { --- End diff -- Looks like you just create a consumer but will it be called? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #185: [GEARPUMP-315] Add GlobalWindows and implemen...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/185 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #184: [GEARPUMP-308] Fix TransformTask output time
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/184 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #183: [GEARPUMP-312] Add Message trait and D...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/183#discussion_r117160623 --- Diff: integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DefaultMessageDeliverySpec.scala --- @@ -28,7 +28,7 @@ import org.apache.gearpump.integrationtest.{TestSpecBase, Util} /** * Checks message delivery consistency, like at-least-once, and exactly-once. */ -class MessageDeliverySpec extends TestSpecBase { +class DefaultMessageDeliverySpec extends TestSpecBase { --- End diff -- Do we need to change this name? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #183: [GEARPUMP-312] Add Message trait and DefaultM...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/183 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #183: [GEARPUMP-312] Add Message trait and DefaultM...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/183 Document update? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #183: [GEARPUMP-312] Add Message trait and DefaultM...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/183 Can we and should we add an apply function in trait Message's companion object which returns a `DefaultMessage`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #182: [GEARPUMP-307] Fix TransformTask$Transform in...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/182 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #181: [GEARPUMP-305] Add Internals section to docs
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/181 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #180: [GEARPUMP-303] add a RabbitMQ sink to ...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/180#discussion_r115162272 --- Diff: experiments/rabbitmq/src/test/scala/org/apache/gearpump/experimental/rabbitmq/RabbitmqSinkSpec.scala --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experimental.rabbitmq + +import com.rabbitmq.client.AMQP.Connection +import com.rabbitmq.client.ConnectionFactory +import org.apache.gearpump.Message --- End diff -- And it would be better if there is no unused import. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #180: [GEARPUMP-303] add a RabbitMQ sink to ...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/180#discussion_r115161088 --- Diff: experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experimental.rabbitmq + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.task.TaskContext +import com.rabbitmq.client.Channel +import com.rabbitmq.client.{Connection, ConnectionFactory} + +class RMQSink(userConfig: UserConfig, +val connFactory: (UserConfig) => ConnectionFactory) extends DataSink{ + + var connectionFactory: ConnectionFactory = connFactory(userConfig) + var connection: Connection = null + var channel: Channel = null + var queueName: String = null + + def this(userConfig: UserConfig) = { +this(userConfig, RMQSink.getConnectionFactory) + } + + override def open(context: TaskContext): Unit = { +connection = connectionFactory.newConnection +channel = connection.createChannel +if (channel == null) { + throw new RuntimeException("None of RabbitMQ channels are available.") +} +setupQueue() + } + + override def write(message: Message): Unit = { +publish(message.msg) + } + + override def close(): Unit = { +channel.close() +connection.close() + } + + protected def setupQueue(): Unit = { +val queue = RMQSink.getQueueName(userConfig) +if (queue.isEmpty) { + throw new RuntimeException("can not get a RabbitMQ queue name") +} + +queueName = queue.get +channel.queueDeclare(queue.get, false, false, false, null) + } + + def publish(msg: Any): Unit = { +msg match { + case seq: Seq[Any] => +seq.foreach(publish) + case str: String => { +channel.basicPublish("", queueName, null, msg.asInstanceOf[String].getBytes) + } + case byteArray: Array[Byte] => { +channel.basicPublish("", queueName, null, byteArray) + } + case _ => { --- End diff -- may be we can log some warning here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #178: [GEARPUMP-303] add a RabbitMQ sink to integra...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/178 Hi @yanghua , maybe you can close this one now:) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #178: [GEARPUMP-303] add a RabbitMQ sink to ...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/178#discussion_r114961959 --- Diff: external/rabbitmq/src/main/scala/org/apache/gearpump/external/rabbitmq/RMQSink.scala --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.external.rabbitmq + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.task.TaskContext +import com.rabbitmq.client.Channel +import com.rabbitmq.client.{Connection, ConnectionFactory} + +class RMQSink(userConfig: UserConfig) extends DataSink{ + + var connection: Connection = null + var channel: Channel = null + var queueName: String = null + + override def open(context: TaskContext): Unit = { +val factory : ConnectionFactory = RMQSink.getConnectionFactory(userConfig) +connection = factory.newConnection +channel = connection.createChannel +if (channel == null) { + throw new RuntimeException("None of RabbitMQ channels are available.") +} +setupQueue() + } + + override def write(message: Message): Unit = { +publish(message.msg) + } + + override def close(): Unit = { +channel.close() +connection.close() + } + + protected def setupQueue(): Unit = { +val queue = RMQSink.getQueueName(userConfig) +if (!queue.nonEmpty) { + throw new RuntimeException("can not get a RabbitMQ queue name") +} + +queueName = queue.get +channel.queueDeclare(queue.get, false, false, false, null) + } + + def publish(msg: Any): Unit = { +msg match { + case seq: Seq[Any] => +seq.foreach(publish) + case str: String => { +channel.basicPublish("", queueName, null, msg.asInstanceOf[String].getBytes) + } + case byteArray: Array[Byte@unchecked] => { +channel.basicPublish("", queueName, null, byteArray) + } + case _ => { + + } +} + } + +} + +object RMQSink { + + val RMQSINK = "rmqsink" + val QUEUE_NAME = "rabbitmq.queue.name" + val SERVER_HOST = "rabbitmq.connection.host" + val SERVER_PORT = "rabbitmq.connection.port" + val CONNECTION_URI = "rabbitmq.connection.uri" + val VIRTUAL_HOST = "rabbitmq.virtualhost" + val AUTH_USERNAME = "rabbitmq.auth.username" + val AUTH_PASSWORD = "rabbitmq.auth.password" + val AUTOMATIC_RECOVERY = "rabbitmq.automatic.recovery" + val CONNECTION_TIMEOUT = "rabbitmq.connection.timeout" + val NETWORK_RECOVERY_INTERVAL = "rabbitmq.network.recovery.interval" + val REQUESTED_HEARTBEAT = "rabbitmq.requested.heartbeat" + val TOPOLOGY_RECOVERY_ENABLED = "rabbitmq.topology.recoveryenabled" + val REQUESTED_CHANNEL_MAX = "rabbitmq.channel.max" + val REQUESTED_FRAME_MAX = "rabbitmq.frame.max" + + def getConnectionFactory(userConfig : UserConfig): ConnectionFactory = { +val factory : ConnectionFactory = new ConnectionFactory; --- End diff -- don't need comma here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #178: [GEARPUMP-303] add a RabbitMQ sink to ...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/178#discussion_r114962623 --- Diff: external/rabbitmq/README.md --- @@ -0,0 +1,21 @@ +# Gearpump RabbitMQ + +Gearpump integration for [RabbitMQ](https://www.rabbitmq.com/) + +## Usage + +The message type that RMQSink is able to handle including: + + 1. String + 2. Array[Byte] + 3. Sequence of type 1 and 2 + +Suppose there is a DataSource Task will output above-mentioned messages, you can write a simple application then: + +```scala +val sink = new RMQSink(UserConfig.empty, "$tableName") +val sinkProcessor = DataSinkProcessor(sink, "$sinkNum") +val split = Processor[DataSource]("$splitNum") +val computation = split ~> sinkProcessor +val application = StreamApplication("RabbitMQ", Graph(computation), UserConfig.empty) +``` --- End diff -- Probably we also need some configuration doc here, like `rabbitmq.connection.host ` etc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #178: [GEARPUMP-303] add a RabbitMQ sink to ...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/178#discussion_r114962293 --- Diff: external/rabbitmq/src/test/scala/org/apache/gearpump/external/rabbitmq/RabbitmqSinkSpec.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.external.rabbitmq + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +class RabbitmqSinkSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + property("HBaseSink should insert a row successfully") { --- End diff -- The assert message is not correct --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #174: [GEARPUMP-295] Fix the failure of getting sta...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/174 @manuzhang --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #174: [GEARPUMP-295] Fix the failure of gett...
GitHub user huafengw opened a pull request: https://github.com/apache/incubator-gearpump/pull/174 [GEARPUMP-295] Fix the failure of getting stalling tasks from clock s⦠â¦ervice Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the commit message is formatted like: `[GEARPUMP-] Meaningful description of pull request` - [ ] Make sure tests pass via `sbt clean test`. - [ ] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality. You can merge this pull request into a Git repository by running: $ git pull https://github.com/huafengw/incubator-gearpump stalling Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-gearpump/pull/174.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #174 commit 3daeaccc1f1f7967bab95f78fdf1430bf82849fc Author: huafengw <fvunic...@gmail.com> Date: 2017-04-14T06:43:24Z [GEARPUMP-295] Fix the failure of getting stalling tasks from clock service --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #170: [GEARPUMP-291] Refactor TaskActor
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/170 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #169: [GEARPUMP-285] fix false alarm of shutting do...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/169 @manuzhang --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #169: [GEARPUMP-285] fix false alarm of shutting do...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/169 I'll refactor status implementation to simple state machine --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #169: [GEARPUMP-285] fix false alarm of shutting do...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/169 Test pass: https://travis-ci.org/huafengw/incubator-gearpump/builds/209307018 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #169: [GEARPUMP-285] fix false alarm of shut...
GitHub user huafengw opened a pull request: https://github.com/apache/incubator-gearpump/pull/169 [GEARPUMP-285] fix false alarm of shutting down executor time out Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the commit message is formatted like: `[GEARPUMP-] Meaningful description of pull request` - [ ] Make sure tests pass via `sbt clean test`. - [ ] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality. You can merge this pull request into a Git repository by running: $ git pull https://github.com/huafengw/incubator-gearpump timeout Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-gearpump/pull/169.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #169 commit 22decc4435cec306fe28c3512a930959366b985c Author: huafengw <fvunic...@gmail.com> Date: 2017-03-09T10:56:06Z [GEARPUMP-285] fix false alarm of shutting down executor time out --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #165: [GEARPUMP-287] Trigger data process on waterm...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/165 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #162: [GEARPUMP-281] Using new version of docker im...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/162 Hi Karol, would you mind do some code modification in org.apache.gearpump.integrationtest.Docker.scala, change the implementation of `getNetworkGateway` to ``` final def getNetworkGateway(container: String): String = { trace(container, s"Get gateway of container...") { Docker.inspect(container, "--format={{.NetworkSettings.Gateway}}") } } ``` Then rerun the integration test on your Ubuntu environment to see whether the modification is OK? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #162: [GEARPUMP-281] Using new version of docker im...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/162 +1 Merging Thanks Karol! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #158: [GEARPUMP-267] Changing docker image for Kafk...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/158 Hi Karol, this pull request includes the former GEARPUMP-236 and it's also covered by your another pr, so maybe we can close this one? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #162: [GEARPUMP-281] Using new version of do...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/162#discussion_r103382701 --- Diff: integrationtest/docker/README.md --- @@ -1,28 +1,23 @@ -# Gearpump Launcher Docker Image - -The image helps developer to setup/test a local [Gearpump](https://github.com/apache/incubator-gearpump.git) cluster quickly. The image is based on a minimal JRE8 environment with Python support. - -## Usage - -Here are the commands to launch a cluster. You can launch as many worker containers as you wish but only one master for the time being. -``` -export GEARPUMP_HOME=/path/to/gearpump - -docker run -d \ - -h master0 --name master0 \ - -v $GEARPUMP_HOME:/opt/gearpump \ - -e JAVA_OPTS=-Dgearpump.cluster.masters.0=master0:3000 \ - -p 8090:8090 \ - stanleyxu2005/gearpump-launcher \ - master -ip master0 -port 3000 - -docker run -d \ - --link master0 \ - -v $GEARPUMP_HOME:/opt/gearpump \ - -e JAVA_OPTS=-Dgearpump.cluster.masters.0=master0:3000 \ - stanleyxu2005/gearpump-launcher \ - worker - -docker exec master0 gear info -docker exec master0 gear app -jar /path/to/userapp.jar [mainclass] [args] -``` +This folder contains docker images definitions used in integration tests of Gearpump. + + +These include: + + * [The standalone single node Kafka cluster with Zookeeper](/kafka) + * [The Hadoop image](/hadoop) + * [The Gearpump Cluster Launcher and Storm Client](/gearpump) + * [Java 8 JRE image](/java) + + +We decided to fork spotify/kafka image, because the project does not maintain proper tagging. +For now our tests focus on Kafka 0.8.x, and the project does not support this version. + +Hadoop docker image (https://hub.docker.com/r/sequenceiq/hadoop-docker/) is well maintained. +We rely on version 2.6.0 and we feel there is no need to duplicate it. + +Gearpump Cluster Launcher helps developer to setup/test a local Gearpump cluster quickly. +The image is based on a minimal JRE8 environment with Python support. + +We used to base Gearpump Cluster Launcher on errordeveloper/oracle-jre image but it stuck on version 8u66-b17 and doesn't support tagging. +We also probably hit a Java bug (http://bugs.java.com/bugdatabase/view_bug.do?bug_id=8133205) that requires never version of JRE. --- End diff -- typo 'nerver' => 'newer' --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #162: [GEARPUMP-281] Using new version of do...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/162#discussion_r103382851 --- Diff: integrationtest/docker/java/README.md --- @@ -0,0 +1,18 @@ +A minimalistic Oracle JDK 8 container on top of busybox. + +We used to base Gearpump Cluster Launcher on errordeveloper/oracle-jre image but it stuck on version 8u66-b17 and doesn't support tagging. +We also probably hit a Java bug (http://bugs.java.com/bugdatabase/view_bug.do?bug_id=8133205) that requires never version of JRE. --- End diff -- also has typo here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #162: [GEARPUMP-281] Using new version of do...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/162#discussion_r103385452 --- Diff: integrationtest/docker/kafka/README.md --- @@ -0,0 +1,89 @@ +Kafka in Docker +=== + +**This is a fork spotify/kafka image. We decided to make it, because the project does not maintain proper tagging. +For now our tests focus on Kafka 0.8.x, so here is a version that supports Kafka 0.8.x. +For latest version of kafka image go to the original project.** + +This repository provides everything you need to run Kafka in Docker. + +For convenience also contains a packaged proxy that can be used to get data from +a legacy Kafka 7 cluster into a dockerized Kafka 8. + +Why? +--- +The main hurdle of running Kafka in Docker is that it depends on Zookeeper. +Compared to other Kafka docker images, this one runs both Zookeeper and Kafka +in the same container. This means: + +* No dependency on an external Zookeeper host, or linking to another container +* Zookeeper and Kafka are configured to work together out of the box + +Run +--- + +```bash +docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`docker-machine ip \`docker-machine active\`` --env ADVERTISED_PORT=9092 grubykarol/kafka --- End diff -- it needs docker-machine? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #161: [GEARPUMP-283] Return app exception to client
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/161 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---