[GitHub] incubator-gearpump issue #247: [GEARPUMP-377] Add TwitterSource and examples

2018-05-07 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/247
  
+1


---


[GitHub] incubator-gearpump issue #247: [GEARPUMP-377] Add TwitterSource and examples

2018-05-06 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/247
  
What will happen if the app has multiple source tasks? Will each task have 
the same output?


---


[GitHub] incubator-gearpump issue #244: [GEARPUMP-373] Don't create hbase configurati...

2018-04-29 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/244
  
+1


---


[GitHub] incubator-gearpump issue #243: [GEARPUMP-339] Fix unknown tag parameter

2018-04-22 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/243
  
+1


---


[GitHub] incubator-gearpump pull request #227: [GEARPUMP-350] Fix the not started app...

2018-04-03 Thread huafengw
Github user huafengw closed the pull request at:

https://github.com/apache/incubator-gearpump/pull/227


---


[GitHub] incubator-gearpump issue #240: [GEARPUMP-344] Fix broken links in README

2018-03-15 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/240
  
+1


---


[GitHub] incubator-gearpump issue #239: [GEARPUMP-366] Create JarStore dir if not exi...

2018-03-15 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/239
  
+1


---


[GitHub] incubator-gearpump issue #238: [GEARPUMP-350] Fix app clock not started

2018-03-14 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/238
  
+1


---


[GitHub] incubator-gearpump issue #233: [GEARPUMP-361] Build gearpump docker image lo...

2017-12-09 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/233
  
+1


---


[GitHub] incubator-gearpump pull request #236: [GEARPUMP-362] Fix bin/gear info shows...

2017-10-27 Thread huafengw
GitHub user huafengw opened a pull request:

https://github.com/apache/incubator-gearpump/pull/236

[GEARPUMP-362] Fix bin/gear info shows nothing when there are apps ru…

…nning

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the commit message is formatted like:
   `[GEARPUMP-] Meaningful description of pull request` 
 - [ ] Make sure tests pass via `sbt clean test`.
 - [ ] Make sure old documentation affected by the pull request has been 
updated and new documentation added for new functionality. 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huafengw/incubator-gearpump info

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-gearpump/pull/236.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #236


commit 65cf99014279198cae9826dc057f79419c035520
Author: huafengw <fvunic...@gmail.com>
Date:   2017-10-28T03:25:54Z

[GEARPUMP-362] Fix bin/gear info shows nothing when there are apps running




---


[GitHub] incubator-gearpump issue #233: [GEARPUMP-361] Build gearpump docker image lo...

2017-10-27 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/233
  
1. I have to build with explicit proxies like 
`docker build --build-arg HTTP_PROXY=... --build-arg HTTPS_PROXY=...`.
If not,  building step
`fetch 
http://dl-cdn.alpinelinux.org/alpine/v3.6/main/x86_64/APKINDEX.tar.gz` will 
always fail.

2. Please add following change in 
`integrationtest/docker/gearpump/Dockerfile`
```
-RUN apk add --no-cache python && \
+RUN apk add --update curl && \
+apk add --no-cache python && \
```

3. After resolved these issues, the integration test still can't proceed. 
Here is the error log:
```
[DEBUG] [15:21:49.279] [Docker$] Container master0>> curl 
http://master0:8090/api/v1.0/master
[DEBUG] [15:21:49.279] [Docker$] INPUT==>>
[DEBUG] [15:21:49.279] [ShellExec$] EXEC master0 => `docker exec master0 
curl -s  --cookie cookie.txt http://master0:8090/api/v1.0/master`
[DEBUG] [15:21:49.636] [ShellExec$] EXEC master0 <= `
Redirection
Redirect
` exit 0
[DEBUG] [15:21:49.636] [Docker$] <<==OUTPUT
[DEBUG] [15:21:49.636] [Docker$] 
Redirection
Redirect

[DEBUG] [15:21:49.637] [Docker$] <<Command END. Container master0, curl 
http://master0:8090/api/v1.0/master

[ERROR] [15:21:49.637] [RestClient] Failed to decode Rest response to 
MasterData
[ERROR] [15:21:49.637] [Util$] Failed due to (false == cluster running), 
retrying for the 15 times...
```


---


[GitHub] incubator-gearpump pull request #227: [GEARPUMP-350] Fix the not started app...

2017-10-26 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/227#discussion_r147065810
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala ---
@@ -361,10 +358,11 @@ class TaskActor(
   }
 
   private def getSubscriptionWatermark(subs: List[(Int, Subscription)], 
wmk: Instant): Instant = {
+val wmkInMilli = wmk.toEpochMilli
--- End diff --

To avoid multiple `toEpochMilli` calls


---


[GitHub] incubator-gearpump pull request #234: [GEARPUMP-359] Fix OutputWatermark adv...

2017-10-26 Thread huafengw
GitHub user huafengw opened a pull request:

https://github.com/apache/incubator-gearpump/pull/234

[GEARPUMP-359] Fix OutputWatermark advancing logic in Subscription

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the commit message is formatted like:
   `[GEARPUMP-] Meaningful description of pull request` 
 - [ ] Make sure tests pass via `sbt clean test`.
 - [ ] Make sure old documentation affected by the pull request has been 
updated and new documentation added for new functionality. 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huafengw/incubator-gearpump subs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-gearpump/pull/234.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #234


commit 311e09b28ebe8ac8525b447e831615065dcc541c
Author: huafengw <fvunic...@gmail.com>
Date:   2017-10-26T05:52:39Z

[GEARPUMP-359] Fix OutputWatermark advancing logic in Subscription




---


[GitHub] incubator-gearpump issue #233: [WIP] Fix integration test

2017-10-23 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/233
  
Verified on centos, the updated image can be launched successfully.


---


[GitHub] incubator-gearpump pull request #232: [GEARPUMP-358] Decrease the frequency ...

2017-10-18 Thread huafengw
GitHub user huafengw opened a pull request:

https://github.com/apache/incubator-gearpump/pull/232

[GEARPUMP-358] Decrease the frequency of watermark calculation

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the commit message is formatted like:
   `[GEARPUMP-] Meaningful description of pull request` 
 - [ ] Make sure tests pass via `sbt clean test`.
 - [ ] Make sure old documentation affected by the pull request has been 
updated and new documentation added for new functionality. 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huafengw/incubator-gearpump regre

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-gearpump/pull/232.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #232


commit 3456bfcfe1fa643f37dedbb88c6e4090d6df5823
Author: huafengw <fvunic...@gmail.com>
Date:   2017-10-18T11:19:44Z

[GEARPUMP-358] Decrease the frequency of watermark calculation




---


[GitHub] incubator-gearpump issue #231: [GEARPUMP-355] Fix YarnAppMaster address reso...

2017-10-12 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/231
  
Hi @titikakatoo, thanks for your response. I'd like to know how you set up 
your cluster. I just got a single-node secured yarn cluster but found the 
command you posted goes well. 

So how many nodes are there in your cluster and where do you execute the 
yarnClient command, just one of the node among them? And also, is there any 
special config I should notice? 


---


[GitHub] incubator-gearpump issue #231: [GEARPUMP-355] Fix YarnAppMaster address reso...

2017-10-12 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/231
  
Hi @titikakatoo , thanks for your contribution! 
Can you briefly describe how to reproduce the problem you found? I'm 
setting up a secured cluster and try to verify your pull request. 
And BTW, does this pull request depend on #230?


---


[GitHub] incubator-gearpump pull request #227: [GEARPUMP-350] Fix the not started app...

2017-09-29 Thread huafengw
GitHub user huafengw opened a pull request:

https://github.com/apache/incubator-gearpump/pull/227

[GEARPUMP-350] Fix the not started app clock

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the commit message is formatted like:
   `[GEARPUMP-] Meaningful description of pull request` 
 - [ ] Make sure tests pass via `sbt clean test`.
 - [ ] Make sure old documentation affected by the pull request has been 
updated and new documentation added for new functionality. 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huafengw/incubator-gearpump clock

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-gearpump/pull/227.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #227


commit 31c124455d362f853aa122970fd2e8ad925a8976
Author: huafengw <fvunic...@gmail.com>
Date:   2017-09-29T08:02:59Z

[GEARPUMP-350] Fix the not started app clock




---


[GitHub] incubator-gearpump issue #227: [GEARPUMP-350] Fix the not started app clock

2017-09-29 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/227
  
Verified streaming examples including complex dag, SOL, word count dsl and 
wordcount java.


---


[GitHub] incubator-gearpump pull request #226: [WIP] Don't use window runner for non-...

2017-09-28 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/226#discussion_r141637090
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
 ---
@@ -73,6 +73,25 @@ case class AndThen[IN, MIDDLE, OUT](left: 
WindowRunner[IN, MIDDLE],
   }
 }
 
+class DirectWindowRunner[IN, OUT](fnRunner: FunctionRunner[IN, OUT])
--- End diff --

Maybe change `WindowRunner` to a more generic name? Then 
`DirectWindowRunner` can be a non-window runner.


---


[GitHub] incubator-gearpump pull request #223: [GEARPUMP-349] Optimize Graph topologi...

2017-09-18 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/223#discussion_r139345072
  
--- Diff: core/src/main/scala/org/apache/gearpump/util/Graph.scala ---
@@ -318,12 +360,8 @@ class Graph[N, E](vertexList: List[N], edgeList: 
List[(N, E, N)]) extends Serial
* http://www.drdobbs.com/database/topological-sorting/184410262
*/
   def topologicalOrderWithCirclesIterator: Iterator[N] = {
-if (hasCycle()) {
-  val topo = getAcyclicCopy().topologicalOrderIterator
-  topo.flatMap(_.sortBy(_indexs(_)).iterator)
-} else {
-  topologicalOrderIterator
-}
+val topo = getAcyclicCopy().topologicalOrderIterator
+topo.flatMap(_.sortBy(indexs(_)).iterator)
--- End diff --

It's not. `topo` is an `Iterator[List[N]]` so the items of topo is sorted 
but the item it self is a list and it's not sorted.


---


[GitHub] incubator-gearpump pull request #223: [GEARPUMP-349] Optimize Graph topologi...

2017-09-14 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/223#discussion_r138880775
  
--- Diff: core/src/main/scala/org/apache/gearpump/util/Graph.scala ---
@@ -81,28 +86,35 @@ class Graph[N, E](vertexList: List[N], edgeList: 
List[(N, E, N)]) extends Serial
* out degree
*/
   def outDegreeOf(node: N): Int = {
-edges.count(_._1 == node)
+outgoingEdgesOf(node).size
   }
 
   /**
* in degree
*/
   def inDegreeOf(node: N): Int = {
-edges.count(_._3 == node)
+incomingEdgesOf(node).size
   }
 
   /**
* out going edges.
*/
   def outgoingEdgesOf(node: N): List[(N, E, N)] = {
-edges.filter(_._1 == node)
+_outEdges.getOrElse(node, List.empty)
   }
 
   /**
* incoming edges.
*/
   def incomingEdgesOf(node: N): List[(N, E, N)] = {
-edges.filter(_._3 == node)
+_inEdges.getOrElse(node, List.empty)
+  }
+
+  /**
+   * adjacent vertices.
+   */
+  def adjacentVertices(node: N): List[N] = {
--- End diff --

It's a directed graph


---


[GitHub] incubator-gearpump pull request #223: [GEARPUMP-349] Optimize Graph topologi...

2017-09-14 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/223#discussion_r138881132
  
--- Diff: core/src/main/scala/org/apache/gearpump/util/Graph.scala ---
@@ -243,13 +259,34 @@ class Graph[N, E](vertexList: List[N], edgeList: 
List[(N, E, N)]) extends Serial
* The node returned by Iterator is stable sorted.
*/
   def topologicalOrderIterator: Iterator[N] = {
-val newGraph = copy
-var output = List.empty[N]
-
-while (!newGraph.isEmpty) {
-  output ++= newGraph.removeZeroInDegree
+tryTopologicalOrderIterator.get
--- End diff --

I think exception is better than dead loop


---


[GitHub] incubator-gearpump pull request #223: [GEARPUMP-349] Optimize Graph topologi...

2017-09-14 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/223#discussion_r138880939
  
--- Diff: core/src/main/scala/org/apache/gearpump/util/Graph.scala ---
@@ -165,7 +181,7 @@ class Graph[N, E](vertexList: List[N], edgeList: 
List[(N, E, N)]) extends Serial
* edges connected to node
*/
   def edgesOf(node: N): List[(N, E, N)] = {
-(incomingEdgesOf(node) ++ outgoingEdgesOf(node)).toSet[(N, E, 
N)].toList.sortBy(_indexs(_))
+(incomingEdgesOf(node) ++ 
outgoingEdgesOf(node)).distinct.sortBy(_indexs(_))
--- End diff --

```
 // This is used to ensure the output of this Graph is always stable
 // Like method vertices(), or edges()
 private var _indexs = Map.empty[Any, Int]



---


[GitHub] incubator-gearpump pull request #223: [GEARPUMP-349] Optimize Graph topologi...

2017-09-14 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/223#discussion_r138882103
  
--- Diff: core/src/main/scala/org/apache/gearpump/util/Graph.scala ---
@@ -355,19 +393,7 @@ class Graph[N, E](vertexList: List[N], edgeList: 
List[(N, E, N)]) extends Serial
* check whether there is a loop
*/
   def hasCycle(): Boolean = {
-@tailrec
-def detectCycle(graph: Graph[N, E]): Boolean = {
-  if (graph.edges.isEmpty) {
-false
-  } else if (graph.vertices.nonEmpty && 
!graph.vertices.exists(graph.inDegreeOf(_) == 0)) {
-true
-  } else {
-graph.removeZeroInDegree
-detectCycle(graph)
-  }
-}
-
-detectCycle(copy)
+tryTopologicalOrderIterator.isFailure
--- End diff --

The graph is mutable so we have to check whether the Graph is changed if 
caching the result.


---


[GitHub] incubator-gearpump pull request #223: [GEARPUMP-349] Optimize Graph topologi...

2017-09-05 Thread huafengw
GitHub user huafengw opened a pull request:

https://github.com/apache/incubator-gearpump/pull/223

[GEARPUMP-349] Optimize Graph topologicalOrderIterator performance

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the commit message is formatted like:
   `[GEARPUMP-] Meaningful description of pull request` 
 - [ ] Make sure tests pass via `sbt clean test`.
 - [ ] Make sure old documentation affected by the pull request has been 
updated and new documentation added for new functionality. 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huafengw/incubator-gearpump graph

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-gearpump/pull/223.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #223


commit 2673c99cb26a8a0733af772f01748f818ea08857
Author: huafengw <fvunic...@gmail.com>
Date:   2017-09-05T08:59:49Z

[GEARPUMP-349] Optimize Graph topologicalOrderIterator performance




---


[GitHub] incubator-gearpump pull request #222: [GEARPUMP-348] Allow application total...

2017-09-01 Thread huafengw
GitHub user huafengw opened a pull request:

https://github.com/apache/incubator-gearpump/pull/222

[GEARPUMP-348] Allow application total number of retries to be config…

…urable

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the commit message is formatted like:
   `[GEARPUMP-] Meaningful description of pull request` 
 - [ ] Make sure tests pass via `sbt clean test`.
 - [ ] Make sure old documentation affected by the pull request has been 
updated and new documentation added for new functionality. 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huafengw/incubator-gearpump restart

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-gearpump/pull/222.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #222


commit 1ff5ac05904320fd64580529897822524edb349a
Author: huafengw <fvunic...@gmail.com>
Date:   2017-09-01T06:21:19Z

[GEARPUMP-348] Allow application total number of retries to be configurable




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #221: [GEARPUMP-217] Merge sql branch into master

2017-08-30 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/221
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #215: [GEARPUMP-343] Fix typo of EmbeddedRuntimeEnv...

2017-08-17 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/215
  
```
The job exceeded the maximum time limit for jobs, and has been terminated.
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #215: [GEARPUMP-343] Fix typo of EmbeddedRun...

2017-08-16 Thread huafengw
GitHub user huafengw opened a pull request:

https://github.com/apache/incubator-gearpump/pull/215

[GEARPUMP-343] Fix typo of EmbeddedRuntimeEnvironment

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the commit message is formatted like:
   `[GEARPUMP-] Meaningful description of pull request` 
 - [ ] Make sure tests pass via `sbt clean test`.
 - [ ] Make sure old documentation affected by the pull request has been 
updated and new documentation added for new functionality. 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huafengw/incubator-gearpump minor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-gearpump/pull/215.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #215


commit c6b11b56deb4cc16f3aa96013b453a7f70e646e6
Author: huafengw <fvunic...@gmail.com>
Date:   2017-08-17T02:37:47Z

[GEARPUMP-343] Fix typo of EmbeddedRuntimeEnvironment




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #214: [GEARPUMP-341] Update processing watermark in...

2017-08-16 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/214
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #213: [GEARPUMP-339] Add ScalaDoc to window api and...

2017-08-13 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/213
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #213: [GEARPUMP-339] Add ScalaDoc to window ...

2017-08-13 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/213#discussion_r132863041
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
 ---
@@ -137,11 +156,14 @@ class DefaultWindowRunner[IN, OUT](
   }
   onTrigger(outputs, newWmk)
 } else {
-  // minimum of end of last triggered window and start of first 
un-triggered window
+  // The output watermark is the minimum of end of last triggered 
window
+  // and start of first un-triggered window
   TriggeredOutputs(outputs, TaskUtil.min(wmk, firstWin.startTime))
 }
   } else {
+// All windows have been triggered.
 if (time == Watermark.MAX) {
+  // This means there will no more inputs so it's safe to advance 
to the maximum watermark.
--- End diff --

there will be no more


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #213: [GEARPUMP-339] Add ScalaDoc to window api and...

2017-08-13 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/213
  
No unit test failed, so where does the failure come from?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #212: [GEARPUMP-339] Add ScalaDoc to Streami...

2017-08-06 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/212#discussion_r131545040
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
 ---
@@ -62,6 +62,17 @@ class StreamApp(
 val dag = planner.plan(graph)
 StreamApplication(name, dag, userConfig)
   }
+
+  def source[T](dataSource: DataSource, parallelism: Int = 1,
+  conf: UserConfig = UserConfig.empty, description: String = 
"source"): Stream[T] = {
+implicit val sourceOp = DataSourceOp(dataSource, parallelism, 
description, conf)
--- End diff --

Has to be implicit?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #212: [GEARPUMP-339] Add ScalaDoc to Streaming DSL

2017-08-06 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/212
  
Generally LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #210: [GEARPUMP-339] Fix unknown tag parameter

2017-08-05 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/210
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #208: [GEARPUMP-338] Improve time related types and...

2017-08-03 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/208
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #207: [GEARPUMP-336] Add scalac options and fix war...

2017-08-03 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/207
  
LGTM, is the failure related? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #205: [GEARPUMP-335] Set kafka tests log level to E...

2017-08-02 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/205
  
Good, please go ahead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #204: [GEARPUMP-334] Fix Java WordCount DSL example

2017-08-02 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/204
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #205: [GEARPUMP-335] Set kafka tests log level to W...

2017-08-02 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/205
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #203: [GEARPUMP-333] Fix examples assembling

2017-08-01 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/203
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #203: [GEARPUMP-333] Fix examples assembling

2017-08-01 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/203
  
It's a random failure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #202: [GEARPUMP-331] Allow applications to b...

2017-07-28 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/202#discussion_r130015303
  
--- Diff: docs/contents/dev/dev-write-1st-app.md ---
@@ -9,7 +9,7 @@ We'll use the classical 
[wordcount](https://github.com/apache/incubator-gearpump
  override val options: Array[(String, CLIOption[Any])] = Array.empty

  override def main(akkaConf: Config, args: Array[String]): Unit = {
-   val context = ClientContext(akkaConf)
+   val context = RuntimeEnvironment.get().newClientContext(akkaConf)
--- End diff --

```
 def apply(config: Config): ClientContext = {
RuntimeEnvironment.get().newClientContext(config)
  }
```
Like this? Then what about 
```
def apply(config: Config, system: ActorSystem, master: ActorRef): 
ClientContext
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #202: [GEARPUMP-331] Allow applications to b...

2017-07-28 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/202#discussion_r130030700
  
--- Diff: docs/contents/dev/dev-write-1st-app.md ---
@@ -9,7 +9,7 @@ We'll use the classical 
[wordcount](https://github.com/apache/incubator-gearpump
  override val options: Array[(String, CLIOption[Any])] = Array.empty

  override def main(akkaConf: Config, args: Array[String]): Unit = {
-   val context = ClientContext(akkaConf)
+   val context = RuntimeEnvironment.get().newClientContext(akkaConf)
--- End diff --

Which interface do you mean?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #202: [GEARPUMP-331] Allow applications can ...

2017-07-27 Thread huafengw
GitHub user huafengw opened a pull request:

https://github.com/apache/incubator-gearpump/pull/202

[GEARPUMP-331] Allow applications can be ran in IDE

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the commit message is formatted like:
   `[GEARPUMP-] Meaningful description of pull request` 
 - [ ] Make sure tests pass via `sbt clean test`.
 - [ ] Make sure old documentation affected by the pull request has been 
updated and new documentation added for new functionality. 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huafengw/incubator-gearpump GP331

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-gearpump/pull/202.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #202


commit c95a826fa24375cafeac009b1388c8f5ab3d3792
Author: huafengw <fvunic...@gmail.com>
Date:   2017-07-27T05:09:28Z

[GEARPUMP-331] Allow applications can be ran in IDE




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #200: [GEARPUMP-330] Allow examples to run in `sbt ...

2017-07-25 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/200
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #200: [GEARPUMP-330] Allow examples to run in `sbt ...

2017-07-25 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/200
  
Document update?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #197: [GEARPUMP-327] Put jarstore and logs under "g...

2017-07-20 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/197
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #196: [GEARPUMP-322] Fix multiple SLF4J bind...

2017-07-20 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/196#discussion_r128453676
  
--- Diff: project/BuildExperiments.scala ---
@@ -46,7 +46,8 @@ object BuildExperiments extends sbt.Build {
   "org.apache.hadoop" % "hadoop-mapreduce-client-core" % 
hadoopVersion,
   "org.apache.hadoop" % "hadoop-yarn-server-resourcemanager" % 
hadoopVersion % "provided",
   "org.apache.hadoop" % "hadoop-yarn-server-nodemanager" % 
hadoopVersion % "provided"
-)
+).map(_.exclude("org.slf4j", "slf4j-api"))
--- End diff --

From gearpump-core


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #194: [GEARPUMP-325] Fix license issues

2017-07-16 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/194
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #193: Bump up version to 0.8.5-SNAPSHOT

2017-07-14 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/193
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #192: [GEARPUMP-320] Handle stashed messages after ...

2017-06-22 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/192
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...

2017-06-22 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/190#discussion_r123429407
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.io._
+import java.time.Instant
+import java.util
+import java.util.Map
+
+import com.google.common.collect.Table
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.refactor.coder._
+import org.apache.gearpump.streaming.refactor.state.api.StateInternals
+import 
org.apache.gearpump.streaming.refactor.state.heap.HeapStateInternalsFactory
+import org.apache.gearpump.streaming.state.impl.{CheckpointManager, 
PersistentStateConfig}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, 
UpdateCheckpointClock}
+import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
+import org.apache.gearpump.util.LogUtil
+import org.apache.gearpump.{Message, TimeStamp}
+
+object StatefulTask {
+  val LOG = LogUtil.getLogger(getClass)
+}
+
+/**
+ * stateful task that support state access and all state will be backed in 
memory
+ * after checkpoint state will persist into storage layer and it will 
guarantee
+ * 'exactly-once' process semantic
+ *
+ */
+abstract class StatefulTask(taskContext: TaskContext, conf: UserConfig)
+  extends Task(taskContext, conf) {
+
+  import taskContext._
+
+  val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory](
+PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get
+  val checkpointStore = checkpointStoreFactory.getCheckpointStore(
+s"app$appId-task${taskId.processorId}_${taskId.index}")
+  val checkpointInterval = 
conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get
+  val checkpointManager = new CheckpointManager(checkpointInterval, 
checkpointStore)
+
+  var inited = false
+
+  // core state data
+  var encodedKeyStateMap: Map[String, Table[String, String, Array[Byte]]] 
= null
+
+  /**
+   * subclass should override this method if they want to init state api
+   */
+  def open: Unit = {}
+
+  /**
+   * Subclass should override this method to specify how a new message 
should update state
+   */
+  def invoke(message: Message): Unit
+
+  def close: Unit = {}
+
+  def getStateInternals[KT](keyCoder: Coder[KT], key: KT): StateInternals 
= {
+if (!inited) {
+  throw new RuntimeException(" please init state access object in 
`open` method! ")
+}
+if (encodedKeyStateMap == null) {
+  encodedKeyStateMap = new util.HashMap[String, Table[String, String, 
Array[Byte]]]()
+}
+
+val factory = new HeapStateInternalsFactory[KT](keyCoder, 
encodedKeyStateMap)
+factory.stateInternalsForKey(key)
+  }
+
+  final override def onStart(startTime: Instant): Unit = {
+// recover state from snapshot
+LOG.info("[onStart] - recover from snapshot")
+val timestamp = startTime.toEpochMilli
+checkpointManager
+  .recover(timestamp)
+  .foreach(recoverState(timestamp, _))
+reportCheckpointClock(timestamp)
+inited = true
+open
+  }
+
+  final override def onNext(message: Message): Unit = {
+checkpointManager.update(message.timestamp.toEpochMilli)
+invoke(message)
+  }
+
+  final override def onWatermarkProgress(watermark: Instant): Unit = {
+if (checkpointManager.shouldCheckpoint(watermark.toEpochMilli)) {
+  checkpointManager.getCheckpointTime.foreach { checkpointTime =>
+val serialized = snapshot
+checkpointManager.checkpoint(checkpointTime, serializ

[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...

2017-06-22 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/190#discussion_r123429421
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.io._
+import java.time.Instant
+import java.util
+import java.util.Map
+
+import com.google.common.collect.Table
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.refactor.coder._
+import org.apache.gearpump.streaming.refactor.state.api.StateInternals
+import 
org.apache.gearpump.streaming.refactor.state.heap.HeapStateInternalsFactory
+import org.apache.gearpump.streaming.state.impl.{CheckpointManager, 
PersistentStateConfig}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, 
UpdateCheckpointClock}
+import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
+import org.apache.gearpump.util.LogUtil
+import org.apache.gearpump.{Message, TimeStamp}
+
+object StatefulTask {
+  val LOG = LogUtil.getLogger(getClass)
+}
+
+/**
+ * stateful task that support state access and all state will be backed in 
memory
+ * after checkpoint state will persist into storage layer and it will 
guarantee
+ * 'exactly-once' process semantic
+ *
+ */
+abstract class StatefulTask(taskContext: TaskContext, conf: UserConfig)
+  extends Task(taskContext, conf) {
+
+  import taskContext._
+
+  val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory](
+PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get
+  val checkpointStore = checkpointStoreFactory.getCheckpointStore(
+s"app$appId-task${taskId.processorId}_${taskId.index}")
+  val checkpointInterval = 
conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get
+  val checkpointManager = new CheckpointManager(checkpointInterval, 
checkpointStore)
+
+  var inited = false
+
+  // core state data
+  var encodedKeyStateMap: Map[String, Table[String, String, Array[Byte]]] 
= null
+
+  /**
+   * subclass should override this method if they want to init state api
+   */
+  def open: Unit = {}
+
+  /**
+   * Subclass should override this method to specify how a new message 
should update state
+   */
+  def invoke(message: Message): Unit
+
+  def close: Unit = {}
+
+  def getStateInternals[KT](keyCoder: Coder[KT], key: KT): StateInternals 
= {
+if (!inited) {
+  throw new RuntimeException(" please init state access object in 
`open` method! ")
+}
+if (encodedKeyStateMap == null) {
+  encodedKeyStateMap = new util.HashMap[String, Table[String, String, 
Array[Byte]]]()
+}
+
+val factory = new HeapStateInternalsFactory[KT](keyCoder, 
encodedKeyStateMap)
+factory.stateInternalsForKey(key)
+  }
+
+  final override def onStart(startTime: Instant): Unit = {
+// recover state from snapshot
+LOG.info("[onStart] - recover from snapshot")
+val timestamp = startTime.toEpochMilli
+checkpointManager
+  .recover(timestamp)
+  .foreach(recoverState(timestamp, _))
+reportCheckpointClock(timestamp)
+inited = true
+open
+  }
+
+  final override def onNext(message: Message): Unit = {
+checkpointManager.update(message.timestamp.toEpochMilli)
+invoke(message)
+  }
+
+  final override def onWatermarkProgress(watermark: Instant): Unit = {
+if (checkpointManager.shouldCheckpoint(watermark.toEpochMilli)) {
+  checkpointManager.getCheckpointTime.foreach { checkpointTime =>
+val serialized = snapshot
+checkpointManager.checkpoint(checkpointTime, serializ

[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...

2017-06-22 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/190#discussion_r123428717
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala
 ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.util.Objects
+
+import org.apache.gearpump.streaming.refactor.coder.Coder
+import org.apache.gearpump.streaming.refactor.state.api.{BagState, 
MapState, SetState, ValueState}
+
+/**
+ * a utility object for StateSpec
+ */
+object StateSpecs {
+
+  private class ValueStateSpec[T](coder: Coder[T]) extends 
StateSpec[ValueState[T]] {
+
+var aCoder: Coder[T] = coder
+
+override def bind(id: String, binder: StateBinder): ValueState[T] = {
+  binder.bindValue(id, this, aCoder)
+}
+
+override def offerCoders(coders: Array[Coder[ValueState[T]]]): Unit = {
+  if (this.aCoder == null) {
+if (coders(0) != null) {
+  this.aCoder = coders(0).asInstanceOf[Coder[T]]
+}
+  }
+}
+
+override def finishSpecifying: Unit = {
+  if (aCoder == null) throw new IllegalStateException(
+"Unable to infer a coder for ValueState and no Coder"
++ " was specified. Please set a coder by either invoking"
++ " StateSpecs.value(Coder valueCoder) or by registering the 
coder in the"
++ " Pipeline's CoderRegistry.")
+}
+
+override def equals(obj: Any): Boolean = {
+  if (obj == this) true
--- End diff --

It's not right in Scala, here without `return`, `true` just becomes a 
useless statement, which means eventually the last line 
`Objects.equals(this.aCoder, that.aCoder)` will be executed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...

2017-06-22 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/190#discussion_r123428767
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala
 ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.util.Objects
+
+import org.apache.gearpump.streaming.refactor.coder.Coder
+import org.apache.gearpump.streaming.refactor.state.api.{BagState, 
MapState, SetState, ValueState}
+
+/**
+ * a utility object for StateSpec
+ */
+object StateSpecs {
+
+  private class ValueStateSpec[T](coder: Coder[T]) extends 
StateSpec[ValueState[T]] {
+
+var aCoder: Coder[T] = coder
+
+override def bind(id: String, binder: StateBinder): ValueState[T] = {
+  binder.bindValue(id, this, aCoder)
+}
+
+override def offerCoders(coders: Array[Coder[ValueState[T]]]): Unit = {
+  if (this.aCoder == null) {
+if (coders(0) != null) {
+  this.aCoder = coders(0).asInstanceOf[Coder[T]]
+}
+  }
+}
+
+override def finishSpecifying: Unit = {
+  if (aCoder == null) throw new IllegalStateException(
+"Unable to infer a coder for ValueState and no Coder"
++ " was specified. Please set a coder by either invoking"
++ " StateSpecs.value(Coder valueCoder) or by registering the 
coder in the"
++ " Pipeline's CoderRegistry.")
+}
+
+override def equals(obj: Any): Boolean = {
+  if (obj == this) true
+
+  if (!(obj.isInstanceOf[ValueStateSpec[T]])) false
+
+  val that: ValueStateSpec[_] = obj.asInstanceOf[ValueStateSpec[_]]
+  Objects.equals(this.aCoder, that.aCoder)
+}
+
+override def hashCode(): Int = {
+  Objects.hashCode(this.aCoder)
+}
+  }
+
+  private class BagStateSpec[T](coder: Coder[T]) extends 
StateSpec[BagState[T]] {
+
+private implicit var elemCoder = coder
+
+override def bind(id: String, binder: StateBinder): BagState[T] =
+  binder.bindBag(id, this, elemCoder)
+
+override def offerCoders(coders: Array[Coder[BagState[T]]]): Unit = {
+  if (this.elemCoder == null) {
+if (coders(0) != null) {
+  this.elemCoder = coders(0).asInstanceOf[Coder[T]]
+}
+  }
+}
+
+override def finishSpecifying: Unit = {
+  if (elemCoder == null) {
+throw new IllegalStateException("Unable to infer a coder for 
BagState and no Coder"
+  + " was specified. Please set a coder by either invoking"
+  + " StateSpecs.bag(Coder elemCoder) or by registering the 
coder in the"
+  + " Pipeline's CoderRegistry.");
+  }
+}
+
+override def equals(obj: Any): Boolean = {
+  if (obj == this) true
+
+  if (!obj.isInstanceOf[BagStateSpec[_]]) false
+
+  val that = obj.asInstanceOf[BagStateSpec[_]]
+  Objects.equals(this.elemCoder, that.elemCoder)
+}
+
+override def hashCode(): Int = Objects.hash(getClass, elemCoder)
+  }
+
+  private class MapStateSpec[K, V](keyCoder: Coder[K], valueCoder: 
Coder[V])
+extends StateSpec[MapState[K, V]] {
+
+private implicit var kCoder = keyCoder
+private implicit var vCoder = valueCoder
+
+override def bind(id: String, binder: StateBinder): MapState[K, V] =
+  binder.bindMap(id, this, keyCoder, valueCoder)
+
+override def offerCoders(coders: Array[Coder[MapState[K, V]]]): Unit = 
{
+  if (this.kCoder == null) {
+if (coders(0) != null) {
+  this.kCoder = coders(0).asInstanceOf[Coder[K]]
+}
+  }
+
+  if

[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...

2017-06-22 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/190#discussion_r123428739
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala
 ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.util.Objects
+
+import org.apache.gearpump.streaming.refactor.coder.Coder
+import org.apache.gearpump.streaming.refactor.state.api.{BagState, 
MapState, SetState, ValueState}
+
+/**
+ * a utility object for StateSpec
+ */
+object StateSpecs {
+
+  private class ValueStateSpec[T](coder: Coder[T]) extends 
StateSpec[ValueState[T]] {
+
+var aCoder: Coder[T] = coder
+
+override def bind(id: String, binder: StateBinder): ValueState[T] = {
+  binder.bindValue(id, this, aCoder)
+}
+
+override def offerCoders(coders: Array[Coder[ValueState[T]]]): Unit = {
+  if (this.aCoder == null) {
+if (coders(0) != null) {
+  this.aCoder = coders(0).asInstanceOf[Coder[T]]
+}
+  }
+}
+
+override def finishSpecifying: Unit = {
+  if (aCoder == null) throw new IllegalStateException(
+"Unable to infer a coder for ValueState and no Coder"
++ " was specified. Please set a coder by either invoking"
++ " StateSpecs.value(Coder valueCoder) or by registering the 
coder in the"
++ " Pipeline's CoderRegistry.")
+}
+
+override def equals(obj: Any): Boolean = {
+  if (obj == this) true
+
+  if (!(obj.isInstanceOf[ValueStateSpec[T]])) false
+
+  val that: ValueStateSpec[_] = obj.asInstanceOf[ValueStateSpec[_]]
+  Objects.equals(this.aCoder, that.aCoder)
+}
+
+override def hashCode(): Int = {
+  Objects.hashCode(this.aCoder)
+}
+  }
+
+  private class BagStateSpec[T](coder: Coder[T]) extends 
StateSpec[BagState[T]] {
+
+private implicit var elemCoder = coder
+
+override def bind(id: String, binder: StateBinder): BagState[T] =
+  binder.bindBag(id, this, elemCoder)
+
+override def offerCoders(coders: Array[Coder[BagState[T]]]): Unit = {
+  if (this.elemCoder == null) {
+if (coders(0) != null) {
+  this.elemCoder = coders(0).asInstanceOf[Coder[T]]
+}
+  }
+}
+
+override def finishSpecifying: Unit = {
+  if (elemCoder == null) {
+throw new IllegalStateException("Unable to infer a coder for 
BagState and no Coder"
+  + " was specified. Please set a coder by either invoking"
+  + " StateSpecs.bag(Coder elemCoder) or by registering the 
coder in the"
+  + " Pipeline's CoderRegistry.");
+  }
+}
+
+override def equals(obj: Any): Boolean = {
+  if (obj == this) true
--- End diff --

The same


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...

2017-06-22 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/190#discussion_r123428752
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala
 ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.util.Objects
+
+import org.apache.gearpump.streaming.refactor.coder.Coder
+import org.apache.gearpump.streaming.refactor.state.api.{BagState, 
MapState, SetState, ValueState}
+
+/**
+ * a utility object for StateSpec
+ */
+object StateSpecs {
+
+  private class ValueStateSpec[T](coder: Coder[T]) extends 
StateSpec[ValueState[T]] {
+
+var aCoder: Coder[T] = coder
+
+override def bind(id: String, binder: StateBinder): ValueState[T] = {
+  binder.bindValue(id, this, aCoder)
+}
+
+override def offerCoders(coders: Array[Coder[ValueState[T]]]): Unit = {
+  if (this.aCoder == null) {
+if (coders(0) != null) {
+  this.aCoder = coders(0).asInstanceOf[Coder[T]]
+}
+  }
+}
+
+override def finishSpecifying: Unit = {
+  if (aCoder == null) throw new IllegalStateException(
+"Unable to infer a coder for ValueState and no Coder"
++ " was specified. Please set a coder by either invoking"
++ " StateSpecs.value(Coder valueCoder) or by registering the 
coder in the"
++ " Pipeline's CoderRegistry.")
+}
+
+override def equals(obj: Any): Boolean = {
+  if (obj == this) true
+
+  if (!(obj.isInstanceOf[ValueStateSpec[T]])) false
+
+  val that: ValueStateSpec[_] = obj.asInstanceOf[ValueStateSpec[_]]
+  Objects.equals(this.aCoder, that.aCoder)
+}
+
+override def hashCode(): Int = {
+  Objects.hashCode(this.aCoder)
+}
+  }
+
+  private class BagStateSpec[T](coder: Coder[T]) extends 
StateSpec[BagState[T]] {
+
+private implicit var elemCoder = coder
+
+override def bind(id: String, binder: StateBinder): BagState[T] =
+  binder.bindBag(id, this, elemCoder)
+
+override def offerCoders(coders: Array[Coder[BagState[T]]]): Unit = {
+  if (this.elemCoder == null) {
+if (coders(0) != null) {
+  this.elemCoder = coders(0).asInstanceOf[Coder[T]]
+}
+  }
+}
+
+override def finishSpecifying: Unit = {
+  if (elemCoder == null) {
+throw new IllegalStateException("Unable to infer a coder for 
BagState and no Coder"
+  + " was specified. Please set a coder by either invoking"
+  + " StateSpecs.bag(Coder elemCoder) or by registering the 
coder in the"
+  + " Pipeline's CoderRegistry.");
+  }
+}
+
+override def equals(obj: Any): Boolean = {
+  if (obj == this) true
+
+  if (!obj.isInstanceOf[BagStateSpec[_]]) false
+
+  val that = obj.asInstanceOf[BagStateSpec[_]]
+  Objects.equals(this.elemCoder, that.elemCoder)
+}
+
+override def hashCode(): Int = Objects.hash(getClass, elemCoder)
+  }
+
+  private class MapStateSpec[K, V](keyCoder: Coder[K], valueCoder: 
Coder[V])
+extends StateSpec[MapState[K, V]] {
+
+private implicit var kCoder = keyCoder
+private implicit var vCoder = valueCoder
+
+override def bind(id: String, binder: StateBinder): MapState[K, V] =
+  binder.bindMap(id, this, keyCoder, valueCoder)
+
+override def offerCoders(coders: Array[Coder[MapState[K, V]]]): Unit = 
{
+  if (this.kCoder == null) {
+if (coders(0) != null) {
+  this.kCoder = coders(0).asInstanceOf[Coder[K]]
+}
+  }
+
+  if

[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...

2017-06-22 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/190#discussion_r123429442
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.io._
+import java.time.Instant
+import java.util
+import java.util.Map
+
+import com.google.common.collect.Table
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.refactor.coder._
+import org.apache.gearpump.streaming.refactor.state.api.StateInternals
+import 
org.apache.gearpump.streaming.refactor.state.heap.HeapStateInternalsFactory
+import org.apache.gearpump.streaming.state.impl.{CheckpointManager, 
PersistentStateConfig}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, 
UpdateCheckpointClock}
+import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
+import org.apache.gearpump.util.LogUtil
+import org.apache.gearpump.{Message, TimeStamp}
+
+object StatefulTask {
+  val LOG = LogUtil.getLogger(getClass)
+}
+
+/**
+ * stateful task that support state access and all state will be backed in 
memory
+ * after checkpoint state will persist into storage layer and it will 
guarantee
+ * 'exactly-once' process semantic
+ *
+ */
+abstract class StatefulTask(taskContext: TaskContext, conf: UserConfig)
+  extends Task(taskContext, conf) {
+
+  import taskContext._
+
+  val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory](
+PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get
+  val checkpointStore = checkpointStoreFactory.getCheckpointStore(
+s"app$appId-task${taskId.processorId}_${taskId.index}")
+  val checkpointInterval = 
conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get
+  val checkpointManager = new CheckpointManager(checkpointInterval, 
checkpointStore)
+
+  var inited = false
+
+  // core state data
+  var encodedKeyStateMap: Map[String, Table[String, String, Array[Byte]]] 
= null
+
+  /**
+   * subclass should override this method if they want to init state api
+   */
+  def open: Unit = {}
+
+  /**
+   * Subclass should override this method to specify how a new message 
should update state
+   */
+  def invoke(message: Message): Unit
+
+  def close: Unit = {}
+
+  def getStateInternals[KT](keyCoder: Coder[KT], key: KT): StateInternals 
= {
+if (!inited) {
+  throw new RuntimeException(" please init state access object in 
`open` method! ")
+}
+if (encodedKeyStateMap == null) {
+  encodedKeyStateMap = new util.HashMap[String, Table[String, String, 
Array[Byte]]]()
+}
+
+val factory = new HeapStateInternalsFactory[KT](keyCoder, 
encodedKeyStateMap)
+factory.stateInternalsForKey(key)
+  }
+
+  final override def onStart(startTime: Instant): Unit = {
+// recover state from snapshot
+LOG.info("[onStart] - recover from snapshot")
+val timestamp = startTime.toEpochMilli
+checkpointManager
+  .recover(timestamp)
+  .foreach(recoverState(timestamp, _))
+reportCheckpointClock(timestamp)
+inited = true
+open
+  }
+
+  final override def onNext(message: Message): Unit = {
+checkpointManager.update(message.timestamp.toEpochMilli)
+invoke(message)
+  }
+
+  final override def onWatermarkProgress(watermark: Instant): Unit = {
+if (checkpointManager.shouldCheckpoint(watermark.toEpochMilli)) {
+  checkpointManager.getCheckpointTime.foreach { checkpointTime =>
+val serialized = snapshot
+checkpointManager.checkpoint(checkpointTime, serializ

[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...

2017-06-22 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/190#discussion_r123430323
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternals.scala
 ---
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.heap
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, 
OutputStream}
+import java.lang.Iterable
+import java.util
+import java.util.Map.Entry
+import java.util._
+import java.util.Objects
+
+import com.google.common.collect.Table
+import org.apache.gearpump.streaming.refactor.coder.{Coder, ListCoder, 
MapCoder, SetCoder}
+import org.apache.gearpump.streaming.refactor.state.{StateBinder, 
StateNamespace, StateSpec, StateTag}
+import org.apache.gearpump.streaming.refactor.state.api._
+import org.apache.gearpump.util.LogUtil
+
+/**
+ *  a heap memory backend StateInternals implementation
+ *  it will cache state in heap memory with a Guava's Table structure
+ *  there are three concept :
+ *  (1) namespace mapping table's row id
+ *  (2) tag mapping table's field
+ *  (3) state mapping table's cell value
+ */
+class HeapStateInternals[K](key: K, stateTable: Table[String, String, 
Array[Byte]])
+  extends StateInternals {
+
+  val LOG = LogUtil.getLogger(getClass)
+
+  private val k: K = key
--- End diff --

redundant


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...

2017-06-22 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/190#discussion_r123429491
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.io._
+import java.time.Instant
+import java.util
+import java.util.Map
+
+import com.google.common.collect.Table
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.refactor.coder._
+import org.apache.gearpump.streaming.refactor.state.api.StateInternals
+import 
org.apache.gearpump.streaming.refactor.state.heap.HeapStateInternalsFactory
+import org.apache.gearpump.streaming.state.impl.{CheckpointManager, 
PersistentStateConfig}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, 
UpdateCheckpointClock}
+import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
+import org.apache.gearpump.util.LogUtil
+import org.apache.gearpump.{Message, TimeStamp}
+
+object StatefulTask {
+  val LOG = LogUtil.getLogger(getClass)
+}
+
+/**
+ * stateful task that support state access and all state will be backed in 
memory
+ * after checkpoint state will persist into storage layer and it will 
guarantee
+ * 'exactly-once' process semantic
+ *
+ */
+abstract class StatefulTask(taskContext: TaskContext, conf: UserConfig)
+  extends Task(taskContext, conf) {
+
+  import taskContext._
+
+  val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory](
+PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get
+  val checkpointStore = checkpointStoreFactory.getCheckpointStore(
+s"app$appId-task${taskId.processorId}_${taskId.index}")
+  val checkpointInterval = 
conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get
+  val checkpointManager = new CheckpointManager(checkpointInterval, 
checkpointStore)
+
+  var inited = false
+
+  // core state data
+  var encodedKeyStateMap: Map[String, Table[String, String, Array[Byte]]] 
= null
+
+  /**
+   * subclass should override this method if they want to init state api
+   */
+  def open: Unit = {}
+
+  /**
+   * Subclass should override this method to specify how a new message 
should update state
+   */
+  def invoke(message: Message): Unit
+
+  def close: Unit = {}
+
+  def getStateInternals[KT](keyCoder: Coder[KT], key: KT): StateInternals 
= {
+if (!inited) {
+  throw new RuntimeException(" please init state access object in 
`open` method! ")
+}
+if (encodedKeyStateMap == null) {
+  encodedKeyStateMap = new util.HashMap[String, Table[String, String, 
Array[Byte]]]()
+}
+
+val factory = new HeapStateInternalsFactory[KT](keyCoder, 
encodedKeyStateMap)
+factory.stateInternalsForKey(key)
+  }
+
+  final override def onStart(startTime: Instant): Unit = {
+// recover state from snapshot
+LOG.info("[onStart] - recover from snapshot")
+val timestamp = startTime.toEpochMilli
+checkpointManager
+  .recover(timestamp)
+  .foreach(recoverState(timestamp, _))
+reportCheckpointClock(timestamp)
+inited = true
+open
+  }
+
+  final override def onNext(message: Message): Unit = {
+checkpointManager.update(message.timestamp.toEpochMilli)
+invoke(message)
+  }
+
+  final override def onWatermarkProgress(watermark: Instant): Unit = {
+if (checkpointManager.shouldCheckpoint(watermark.toEpochMilli)) {
+  checkpointManager.getCheckpointTime.foreach { checkpointTime =>
+val serialized = snapshot
+checkpointManager.checkpoint(checkpointTime, serializ

[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...

2017-06-22 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/190#discussion_r123430243
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/InMemoryGlobalStateInternals.scala
 ---
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.util
+import java.util.Map.Entry
+import java.util.{ArrayList, HashSet, List, Set}
+import java.lang.Iterable
+
+import com.google.common.collect.{HashBasedTable, Table}
+import org.apache.gearpump.streaming.refactor.coder.Coder
+import 
org.apache.gearpump.streaming.refactor.state.InMemoryGlobalStateInternals.InMemoryStateBinder
+import org.apache.gearpump.streaming.refactor.state.api._
+
+/**
+ * a no namespace (global) in memory state internal.
+ * it is a default implementation inspired by Apache Beam.
+ * but do not recommend use it!
+ */
+class InMemoryGlobalStateInternals[K] protected(key: K) extends 
StateInternals {
+
+  private val k: K = key
--- End diff --

Look like `k` is redundant since you already have `key`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #189: [Gearpump 311] refactor state management

2017-06-21 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/189
  
Hi @yanghua , thanks for your contribution! This is a huge pull request so 
I think it would take a while to fully review this one, and also, can you give 
some description of this pull request about it's mainly idea or design, which 
would be great help, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #188: [GEARPUMP-318] Exclude maven-plugin-api depen...

2017-06-20 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/188
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #187: [GEARPUMP-317] Fix Task minClock

2017-06-18 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/187#discussion_r122588303
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
 ---
@@ -118,25 +126,34 @@ class DefaultWindowRunner[IN, OUT](
 }
   })
   fnRunner.finish().foreach {
-out: OUT => outputs += TimestampedValue(out, 
firstWin.endTime.minusMillis(1))
+out: OUT =>
+  outputs += TimestampedValue(out, 
firstWin.endTime.minusMillis(1))
   }
+  val newWmk = TaskUtil.max(wmk, firstWin.endTime)
   if (windows.accumulationMode == Discarding) {
 fnRunner.teardown()
 setup = false
 // discarding, setup need to be called for each window
-onTrigger(outputs)
+onTrigger(outputs, newWmk)
--- End diff --

Move `onTrigger` outside of the `if else`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #187: [GEARPUMP-317] Fix Task minClock

2017-06-18 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/187#discussion_r122588434
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala 
---
@@ -40,7 +41,7 @@ class Subscription(
 executorId: Int,
 taskId: TaskId,
 subscriber: Subscriber, sessionId: Int,
-transport: ExpressTransport,
+sender: TaskActor,
--- End diff --

I think here `sender` is just the current task actor, while literally 
sender means the upstream task actor 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #186: [GEARPUMP-316] Decouple groupBy from w...

2017-06-06 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/186#discussion_r120526897
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
 ---
@@ -32,35 +32,34 @@ object WindowFunction {
   }
 }
 
-trait WindowFunction[T] {
+trait WindowFunction {
 
-  def apply(context: WindowFunction.Context[T]): Array[Window]
+  def apply[T](context: WindowFunction.Context[T]): Array[Window]
 
   def isNonMerging: Boolean
 }
 
-abstract class NonMergingWindowFunction[T] extends WindowFunction[T] {
+abstract class NonMergingWindowFunction extends WindowFunction {
 
   override def isNonMerging: Boolean = true
 }
 
-case class GlobalWindowFunction[T]() extends NonMergingWindowFunction[T] {
+case class GlobalWindowFunction() extends NonMergingWindowFunction {
 
-  override def apply(context: WindowFunction.Context[T]): Array[Window] = {
+  override def apply[T](context: WindowFunction.Context[T]): Array[Window] 
= {
 Array(Window(Instant.ofEpochMilli(MIN_TIME_MILLIS),
--- End diff --

We can create just one static object, no need to create one each time as 
it's immutable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #186: [GEARPUMP-316] Decouple groupBy from window

2017-06-06 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/186
  
First I got an irrelevant comment, there is a 
`org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner`, which looks 
like in a wrong place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #186: [GEARPUMP-316] Decouple groupBy from w...

2017-06-06 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/186#discussion_r120526614
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+import java.util.function.Consumer
+
+import com.gs.collections.impl.map.mutable.UnifiedMap
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import 
org.apache.gearpump.streaming.Constants.{GEARPUMP_STREAMING_GROUPBY_FUNCTION, 
GEARPUMP_STREAMING_OPERATOR}
+import org.apache.gearpump.streaming.dsl.window.impl.WindowRunner
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+
+/**
+ * Processes messages in groups as defined by groupBy function.
+ */
+class GroupByTask[IN, GROUP, OUT](
+groupBy: IN => GROUP,
+taskContext: TaskContext,
+userConfig: UserConfig) extends Task(taskContext, userConfig) {
+
+  def this(context: TaskContext, conf: UserConfig) = {
+this(
+  conf.getValue[IN => 
GROUP](GEARPUMP_STREAMING_GROUPBY_FUNCTION)(context.system).get,
+  context, conf
+)
+  }
+
+  private val groups: UnifiedMap[GROUP, WindowRunner[IN, OUT]] =
+new UnifiedMap[GROUP, WindowRunner[IN, OUT]]
+
+  override def onNext(message: Message): Unit = {
+val input = message.value.asInstanceOf[IN]
+val group = groupBy(input)
+
+if (!groups.containsKey(group)) {
+  groups.put(group,
+userConfig.getValue[WindowRunner[IN, OUT]](
+  GEARPUMP_STREAMING_OPERATOR)(taskContext.system).get)
+}
+
+groups.get(group).process(input, message.timestamp)
+  }
+
+  override def onWatermarkProgress(watermark: Instant): Unit = {
+groups.values.forEach(new Consumer[WindowRunner[IN, OUT]] {
--- End diff --

Looks like you just create a consumer but will it be called?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #185: [GEARPUMP-315] Add GlobalWindows and implemen...

2017-05-27 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/185
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #184: [GEARPUMP-308] Fix TransformTask output time

2017-05-18 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/184
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #183: [GEARPUMP-312] Add Message trait and D...

2017-05-18 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/183#discussion_r117160623
  
--- Diff: 
integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DefaultMessageDeliverySpec.scala
 ---
@@ -28,7 +28,7 @@ import org.apache.gearpump.integrationtest.{TestSpecBase, 
Util}
 /**
  * Checks message delivery consistency, like at-least-once, and 
exactly-once.
  */
-class MessageDeliverySpec extends TestSpecBase {
+class DefaultMessageDeliverySpec extends TestSpecBase {
--- End diff --

Do we need to change this name?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #183: [GEARPUMP-312] Add Message trait and DefaultM...

2017-05-18 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/183
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #183: [GEARPUMP-312] Add Message trait and DefaultM...

2017-05-17 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/183
  
Document update?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #183: [GEARPUMP-312] Add Message trait and DefaultM...

2017-05-17 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/183
  
Can we and should we add an apply function in trait Message's companion 
object which returns a `DefaultMessage`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #182: [GEARPUMP-307] Fix TransformTask$Transform in...

2017-05-09 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/182
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #181: [GEARPUMP-305] Add Internals section to docs

2017-05-08 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/181
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #180: [GEARPUMP-303] add a RabbitMQ sink to ...

2017-05-07 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/180#discussion_r115162272
  
--- Diff: 
experiments/rabbitmq/src/test/scala/org/apache/gearpump/experimental/rabbitmq/RabbitmqSinkSpec.scala
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experimental.rabbitmq
+
+import com.rabbitmq.client.AMQP.Connection
+import com.rabbitmq.client.ConnectionFactory
+import org.apache.gearpump.Message
--- End diff --

And it would be better if there is no unused import.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #180: [GEARPUMP-303] add a RabbitMQ sink to ...

2017-05-07 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/180#discussion_r115161088
  
--- Diff: 
experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experimental.rabbitmq
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.TaskContext
+import com.rabbitmq.client.Channel
+import com.rabbitmq.client.{Connection, ConnectionFactory}
+
+class RMQSink(userConfig: UserConfig,
+val connFactory: (UserConfig) => ConnectionFactory) extends DataSink{
+
+  var connectionFactory: ConnectionFactory = connFactory(userConfig)
+  var connection: Connection = null
+  var channel: Channel = null
+  var queueName: String = null
+
+  def this(userConfig: UserConfig) = {
+this(userConfig, RMQSink.getConnectionFactory)
+  }
+
+  override def open(context: TaskContext): Unit = {
+connection = connectionFactory.newConnection
+channel = connection.createChannel
+if (channel == null) {
+  throw new RuntimeException("None of RabbitMQ channels are 
available.")
+}
+setupQueue()
+  }
+
+  override def write(message: Message): Unit = {
+publish(message.msg)
+  }
+
+  override def close(): Unit = {
+channel.close()
+connection.close()
+  }
+
+  protected def setupQueue(): Unit = {
+val queue = RMQSink.getQueueName(userConfig)
+if (queue.isEmpty) {
+  throw new RuntimeException("can not get a RabbitMQ queue name")
+}
+
+queueName = queue.get
+channel.queueDeclare(queue.get, false, false, false, null)
+  }
+
+  def publish(msg: Any): Unit = {
+msg match {
+  case seq: Seq[Any] =>
+seq.foreach(publish)
+  case str: String => {
+channel.basicPublish("", queueName, null, 
msg.asInstanceOf[String].getBytes)
+  }
+  case byteArray: Array[Byte] => {
+channel.basicPublish("", queueName, null, byteArray)
+  }
+  case _ => {
--- End diff --

may be we can log some warning here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #178: [GEARPUMP-303] add a RabbitMQ sink to integra...

2017-05-07 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/178
  
Hi @yanghua , maybe you can close this one now:)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #178: [GEARPUMP-303] add a RabbitMQ sink to ...

2017-05-05 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/178#discussion_r114961959
  
--- Diff: 
external/rabbitmq/src/main/scala/org/apache/gearpump/external/rabbitmq/RMQSink.scala
 ---
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.external.rabbitmq
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.TaskContext
+import com.rabbitmq.client.Channel
+import com.rabbitmq.client.{Connection, ConnectionFactory}
+
+class RMQSink(userConfig: UserConfig) extends DataSink{
+
+  var connection: Connection = null
+  var channel: Channel = null
+  var queueName: String = null
+
+  override def open(context: TaskContext): Unit = {
+val factory : ConnectionFactory = 
RMQSink.getConnectionFactory(userConfig)
+connection = factory.newConnection
+channel = connection.createChannel
+if (channel == null) {
+  throw new RuntimeException("None of RabbitMQ channels are 
available.")
+}
+setupQueue()
+  }
+
+  override def write(message: Message): Unit = {
+publish(message.msg)
+  }
+
+  override def close(): Unit = {
+channel.close()
+connection.close()
+  }
+
+  protected def setupQueue(): Unit = {
+val queue = RMQSink.getQueueName(userConfig)
+if (!queue.nonEmpty) {
+  throw new RuntimeException("can not get a RabbitMQ queue name")
+}
+
+queueName = queue.get
+channel.queueDeclare(queue.get, false, false, false, null)
+  }
+
+  def publish(msg: Any): Unit = {
+msg match {
+  case seq: Seq[Any] =>
+seq.foreach(publish)
+  case str: String => {
+channel.basicPublish("", queueName, null, 
msg.asInstanceOf[String].getBytes)
+  }
+  case byteArray: Array[Byte@unchecked] => {
+channel.basicPublish("", queueName, null, byteArray)
+  }
+  case _ => {
+
+  }
+}
+  }
+
+}
+
+object RMQSink {
+
+  val RMQSINK = "rmqsink"
+  val QUEUE_NAME = "rabbitmq.queue.name"
+  val SERVER_HOST = "rabbitmq.connection.host"
+  val SERVER_PORT = "rabbitmq.connection.port"
+  val CONNECTION_URI = "rabbitmq.connection.uri"
+  val VIRTUAL_HOST = "rabbitmq.virtualhost"
+  val AUTH_USERNAME = "rabbitmq.auth.username"
+  val AUTH_PASSWORD = "rabbitmq.auth.password"
+  val AUTOMATIC_RECOVERY = "rabbitmq.automatic.recovery"
+  val CONNECTION_TIMEOUT = "rabbitmq.connection.timeout"
+  val NETWORK_RECOVERY_INTERVAL = "rabbitmq.network.recovery.interval"
+  val REQUESTED_HEARTBEAT = "rabbitmq.requested.heartbeat"
+  val TOPOLOGY_RECOVERY_ENABLED = "rabbitmq.topology.recoveryenabled"
+  val REQUESTED_CHANNEL_MAX = "rabbitmq.channel.max"
+  val REQUESTED_FRAME_MAX = "rabbitmq.frame.max"
+
+  def getConnectionFactory(userConfig : UserConfig): ConnectionFactory = {
+val factory : ConnectionFactory = new ConnectionFactory;
--- End diff --

don't need comma here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #178: [GEARPUMP-303] add a RabbitMQ sink to ...

2017-05-05 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/178#discussion_r114962623
  
--- Diff: external/rabbitmq/README.md ---
@@ -0,0 +1,21 @@
+# Gearpump RabbitMQ
+
+Gearpump integration for [RabbitMQ](https://www.rabbitmq.com/)
+
+## Usage
+
+The message type that RMQSink is able to handle including:
+
+ 1. String
+ 2. Array[Byte]
+ 3. Sequence of type 1 and 2
+
+Suppose there is a DataSource Task will output above-mentioned messages, 
you can write a simple application then:
+
+```scala
+val sink = new RMQSink(UserConfig.empty, "$tableName")
+val sinkProcessor = DataSinkProcessor(sink, "$sinkNum")
+val split = Processor[DataSource]("$splitNum")
+val computation = split ~> sinkProcessor
+val application = StreamApplication("RabbitMQ", Graph(computation), 
UserConfig.empty)
+```
--- End diff --

Probably we also need some configuration doc here, like  
`rabbitmq.connection.host ` etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #178: [GEARPUMP-303] add a RabbitMQ sink to ...

2017-05-05 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/178#discussion_r114962293
  
--- Diff: 
external/rabbitmq/src/test/scala/org/apache/gearpump/external/rabbitmq/RabbitmqSinkSpec.scala
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.external.rabbitmq
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class RabbitmqSinkSpec extends PropSpec with PropertyChecks with Matchers 
with MockitoSugar {
+
+  property("HBaseSink should insert a row successfully") {
--- End diff --

The assert message is not correct


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #174: [GEARPUMP-295] Fix the failure of getting sta...

2017-04-20 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/174
  
@manuzhang 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #174: [GEARPUMP-295] Fix the failure of gett...

2017-04-14 Thread huafengw
GitHub user huafengw opened a pull request:

https://github.com/apache/incubator-gearpump/pull/174

[GEARPUMP-295] Fix the failure of getting stalling tasks from clock s…

…ervice

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the commit message is formatted like:
   `[GEARPUMP-] Meaningful description of pull request` 
 - [ ] Make sure tests pass via `sbt clean test`.
 - [ ] Make sure old documentation affected by the pull request has been 
updated and new documentation added for new functionality. 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huafengw/incubator-gearpump stalling

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-gearpump/pull/174.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #174


commit 3daeaccc1f1f7967bab95f78fdf1430bf82849fc
Author: huafengw <fvunic...@gmail.com>
Date:   2017-04-14T06:43:24Z

[GEARPUMP-295] Fix the failure of getting stalling tasks from clock service




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #170: [GEARPUMP-291] Refactor TaskActor

2017-03-10 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/170
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #169: [GEARPUMP-285] fix false alarm of shutting do...

2017-03-09 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/169
  
@manuzhang 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #169: [GEARPUMP-285] fix false alarm of shutting do...

2017-03-09 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/169
  
I'll refactor status implementation to simple state machine


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #169: [GEARPUMP-285] fix false alarm of shutting do...

2017-03-09 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/169
  
Test pass: 
https://travis-ci.org/huafengw/incubator-gearpump/builds/209307018


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #169: [GEARPUMP-285] fix false alarm of shut...

2017-03-09 Thread huafengw
GitHub user huafengw opened a pull request:

https://github.com/apache/incubator-gearpump/pull/169

[GEARPUMP-285] fix false alarm of shutting down executor time out

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the commit message is formatted like:
   `[GEARPUMP-] Meaningful description of pull request` 
 - [ ] Make sure tests pass via `sbt clean test`.
 - [ ] Make sure old documentation affected by the pull request has been 
updated and new documentation added for new functionality. 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huafengw/incubator-gearpump timeout

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-gearpump/pull/169.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #169


commit 22decc4435cec306fe28c3512a930959366b985c
Author: huafengw <fvunic...@gmail.com>
Date:   2017-03-09T10:56:06Z

[GEARPUMP-285] fix false alarm of shutting down executor time out




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #165: [GEARPUMP-287] Trigger data process on waterm...

2017-03-02 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/165
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #162: [GEARPUMP-281] Using new version of docker im...

2017-03-01 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/162
  
Hi Karol, would you mind do some code modification in 
org.apache.gearpump.integrationtest.Docker.scala, change the implementation of 
`getNetworkGateway` to 
```
  final def getNetworkGateway(container: String): String = {
trace(container, s"Get gateway of container...") {
  Docker.inspect(container, "--format={{.NetworkSettings.Gateway}}")
}
  }
```

Then rerun the integration test on your Ubuntu environment to see whether 
the modification is OK?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #162: [GEARPUMP-281] Using new version of docker im...

2017-03-01 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/162
  
+1 Merging
Thanks Karol!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #158: [GEARPUMP-267] Changing docker image for Kafk...

2017-02-28 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/158
  
Hi Karol, this pull request includes the former GEARPUMP-236 and it's also 
covered by your another pr, so maybe we can close this one?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #162: [GEARPUMP-281] Using new version of do...

2017-02-28 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/162#discussion_r103382701
  
--- Diff: integrationtest/docker/README.md ---
@@ -1,28 +1,23 @@
-# Gearpump Launcher Docker Image
-
-The image helps developer to setup/test a local 
[Gearpump](https://github.com/apache/incubator-gearpump.git) cluster quickly. 
The image is based on a minimal JRE8 environment with Python support. 
-
-## Usage
-
-Here are the commands to launch a cluster. You can launch as many worker 
containers as you wish but only one master for the time being.
-```
-export GEARPUMP_HOME=/path/to/gearpump
-
-docker run -d \
- -h master0 --name master0 \
- -v $GEARPUMP_HOME:/opt/gearpump \
- -e JAVA_OPTS=-Dgearpump.cluster.masters.0=master0:3000 \
- -p 8090:8090 \
- stanleyxu2005/gearpump-launcher \
- master -ip master0 -port 3000
-
-docker run -d \
- --link master0 \
- -v $GEARPUMP_HOME:/opt/gearpump \
- -e JAVA_OPTS=-Dgearpump.cluster.masters.0=master0:3000 \
- stanleyxu2005/gearpump-launcher \
- worker
-
-docker exec master0 gear info
-docker exec master0 gear app -jar /path/to/userapp.jar [mainclass] [args]
-```
+This folder contains docker images definitions used in integration tests 
of Gearpump.
+
+
+These include:
+
+ * [The standalone single node Kafka cluster with Zookeeper](/kafka)
+ * [The Hadoop image](/hadoop)
+ * [The Gearpump Cluster Launcher and Storm Client](/gearpump)
+ * [Java 8 JRE image](/java)
+
+
+We decided to fork spotify/kafka image, because the project does not 
maintain proper tagging. 
+For now our tests focus on Kafka 0.8.x, and the project does not support 
this version. 
+
+Hadoop docker image (https://hub.docker.com/r/sequenceiq/hadoop-docker/) 
is well maintained. 
+We rely on version 2.6.0 and we feel there is no need to duplicate it.
+
+Gearpump Cluster Launcher  helps developer to setup/test a local Gearpump 
cluster quickly. 
+The image is based on a minimal JRE8 environment with Python support.
+
+We used to base Gearpump Cluster Launcher on errordeveloper/oracle-jre 
image but it stuck on version 8u66-b17 and doesn't support tagging.
+We also probably hit a Java bug 
(http://bugs.java.com/bugdatabase/view_bug.do?bug_id=8133205) that requires 
never version of JRE.
--- End diff --

typo 'nerver' => 'newer'


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #162: [GEARPUMP-281] Using new version of do...

2017-02-28 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/162#discussion_r103382851
  
--- Diff: integrationtest/docker/java/README.md ---
@@ -0,0 +1,18 @@
+A minimalistic Oracle JDK 8 container on top of busybox.
+
+We used to base Gearpump Cluster Launcher on errordeveloper/oracle-jre 
image but it stuck on version 8u66-b17 and doesn't support tagging.
+We also probably hit a Java bug 
(http://bugs.java.com/bugdatabase/view_bug.do?bug_id=8133205) that requires 
never version of JRE.
--- End diff --

also has typo here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #162: [GEARPUMP-281] Using new version of do...

2017-02-28 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/162#discussion_r103385452
  
--- Diff: integrationtest/docker/kafka/README.md ---
@@ -0,0 +1,89 @@
+Kafka in Docker
+===
+
+**This is a fork spotify/kafka image. We decided to make it, because the 
project does not maintain proper tagging. 
+For now our tests focus on Kafka 0.8.x, so here is a version that supports 
Kafka 0.8.x.
+For latest version of kafka image go to the original project.**
+   
+This repository provides everything you need to run Kafka in Docker.
+
+For convenience also contains a packaged proxy that can be used to get 
data from
+a legacy Kafka 7 cluster into a dockerized Kafka 8.
+
+Why?
+---
+The main hurdle of running Kafka in Docker is that it depends on Zookeeper.
+Compared to other Kafka docker images, this one runs both Zookeeper and 
Kafka
+in the same container. This means:
+
+* No dependency on an external Zookeeper host, or linking to another 
container
+* Zookeeper and Kafka are configured to work together out of the box
+
+Run
+---
+
+```bash
+docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`docker-machine 
ip \`docker-machine active\`` --env ADVERTISED_PORT=9092 grubykarol/kafka
--- End diff --

it needs docker-machine?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #161: [GEARPUMP-283] Return app exception to client

2017-02-27 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/161
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   >