[jira] [Commented] (GEARPUMP-331) Allow examples to be ran in IDE

2017-07-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/GEARPUMP-331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16104491#comment-16104491
 ] 

ASF GitHub Bot commented on GEARPUMP-331:
-

Github user manuzhang commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/202#discussion_r130017080
  
--- Diff: 
core/src/main/scala/org/apache/gearpump/cluster/embedded/LocalRuntimeEnvironemnt.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.embedded
+
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.client.{ClientContext, 
RuntimeEnvironment}
+import 
org.apache.gearpump.cluster.embedded.LocalRuntimeEnvironemnt.LocalClientContext
+
+class LocalRuntimeEnvironemnt extends RuntimeEnvironment {
+  override def newClientContext(akkaConf: Config): ClientContext = {
+new LocalClientContext(akkaConf)
+  }
+}
+
+object LocalRuntimeEnvironemnt {
+  class LocalClientContext private (cluster: EmbeddedCluster)
--- End diff --

`LocalRuntimeEnvironment` and `LocalClientContext` may not be the most 
proper name here. Remember we have a `local` cluster ? 


> Allow examples to be ran in IDE
> ---
>
> Key: GEARPUMP-331
> URL: https://issues.apache.org/jira/browse/GEARPUMP-331
> Project: Apache Gearpump
>  Issue Type: Improvement
>Reporter: Huafeng Wang
>Assignee: Huafeng Wang
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (GEARPUMP-331) Allow examples to be ran in IDE

2017-07-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/GEARPUMP-331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16104490#comment-16104490
 ] 

ASF GitHub Bot commented on GEARPUMP-331:
-

Github user manuzhang commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/202#discussion_r130016858
  
--- Diff: 
core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala 
---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.client
+
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.embedded.LocalRuntimeEnvironemnt
+
+abstract class RuntimeEnvironment {
--- End diff --

Please explain a bit what is `RuntimeEnvironment`, 
`LocalRuntimeEnvironment` and `RemoteEnvironment`


> Allow examples to be ran in IDE
> ---
>
> Key: GEARPUMP-331
> URL: https://issues.apache.org/jira/browse/GEARPUMP-331
> Project: Apache Gearpump
>  Issue Type: Improvement
>Reporter: Huafeng Wang
>Assignee: Huafeng Wang
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (GEARPUMP-331) Allow examples to be ran in IDE

2017-07-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/GEARPUMP-331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16104477#comment-16104477
 ] 

ASF GitHub Bot commented on GEARPUMP-331:
-

Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/202#discussion_r130015303
  
--- Diff: docs/contents/dev/dev-write-1st-app.md ---
@@ -9,7 +9,7 @@ We'll use the classical 
[wordcount](https://github.com/apache/incubator-gearpump
  override val options: Array[(String, CLIOption[Any])] = Array.empty

  override def main(akkaConf: Config, args: Array[String]): Unit = {
-   val context = ClientContext(akkaConf)
+   val context = RuntimeEnvironment.get().newClientContext(akkaConf)
--- End diff --

```
 def apply(config: Config): ClientContext = {
RuntimeEnvironment.get().newClientContext(config)
  }
```
Like this? Then what about 
```
def apply(config: Config, system: ActorSystem, master: ActorRef): 
ClientContext
```



> Allow examples to be ran in IDE
> ---
>
> Key: GEARPUMP-331
> URL: https://issues.apache.org/jira/browse/GEARPUMP-331
> Project: Apache Gearpump
>  Issue Type: Improvement
>Reporter: Huafeng Wang
>Assignee: Huafeng Wang
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] incubator-gearpump pull request #198: [GEARPUMP-311] refactor state manageme...

2017-07-27 Thread manuzhang
Github user manuzhang commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/198#discussion_r129997628
  
--- Diff: 
examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/refactor/WordCount.scala
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.wordcount.dsl.refactor
+
+import org.apache.commons.logging.{Log, LogFactory}
+import org.apache.gearpump.streaming.refactor.coder.{StringUtf8Coder, 
VarLongCoder}
+import 
org.apache.gearpump.streaming.refactor.dsl.api.functions.MapWithStateFunction
+import org.apache.gearpump.streaming.refactor.dsl.scalaapi.StreamApp
+import 
org.apache.gearpump.streaming.refactor.dsl.scalaapi.functions.FlatMapWithStateFunction
+import org.apache.gearpump.streaming.refactor.state.api.{StateInternals, 
ValueState}
+import org.apache.gearpump.streaming.refactor.state.{RuntimeContext, 
StateNamespaces, StateTags}
+import org.apache.hadoop.conf.Configuration
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
+import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory
+import org.apache.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation
+import org.apache.gearpump.streaming.state.impl.PersistentStateConfig
+import org.apache.gearpump.streaming.{Processor, StreamApplication}
+import org.apache.gearpump.util.{AkkaApp, Graph}
+
+/**
+ *
+ */
+object WordCount extends AkkaApp with ArgumentsParser {
+
+  override val options: Array[(String, CLIOption[Any])] = Array.empty
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+val context = ClientContext(akkaConf)
+
+val hadoopConfig = new Configuration
+val checkpointStoreFactory = new 
HadoopCheckpointStoreFactory("MessageConsume", hadoopConfig,
+  // Rotates on 1MB
+  new FileSizeRotation(100))
+val taskConfig = UserConfig.empty
+  .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
+  .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L)
+  .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY,
+checkpointStoreFactory)(context.system)
+
+val app = StreamApp("dsl", context, taskConfig)
+val data = "This is a good start, bingo!! bingo!!"
+app.source(data.lines.toList, 1, "source").
+  // word => (word, count)
+  flatMapWithState(new StatefulFlatMapFunction(), "a stateful flatmap 
udf").
+  mapWithState(new StatefulMapFunction(), "").
+  // (word, count1), (word, count2) => (word, count1 + count2)
+  groupByKey().sum.log
+
+context.submit(app).waitUntilFinish()
+context.close()
+  }
+
+
+  private class StatefulFlatMapFunction
+extends FlatMapWithStateFunction[String, String] {
+
+private val logger: Log = LogFactory.getLog(getClass)
+
+private implicit val counterStateTag = "tag1"
+
+private var stateInternals: Option[StateInternals] = None
+private var counterState: Option[ValueState[java.lang.Long]] = None
--- End diff --

what is the `counterState` used for ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #198: [GEARPUMP-311] refactor state manageme...

2017-07-27 Thread manuzhang
Github user manuzhang commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/198#discussion_r129996891
  
--- Diff: 
examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/refactor/WordCount.scala
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.wordcount.dsl.refactor
+
+import org.apache.commons.logging.{Log, LogFactory}
+import org.apache.gearpump.streaming.refactor.coder.{StringUtf8Coder, 
VarLongCoder}
+import 
org.apache.gearpump.streaming.refactor.dsl.api.functions.MapWithStateFunction
+import org.apache.gearpump.streaming.refactor.dsl.scalaapi.StreamApp
+import 
org.apache.gearpump.streaming.refactor.dsl.scalaapi.functions.FlatMapWithStateFunction
+import org.apache.gearpump.streaming.refactor.state.api.{StateInternals, 
ValueState}
+import org.apache.gearpump.streaming.refactor.state.{RuntimeContext, 
StateNamespaces, StateTags}
+import org.apache.hadoop.conf.Configuration
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
+import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory
+import org.apache.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation
+import org.apache.gearpump.streaming.state.impl.PersistentStateConfig
+import org.apache.gearpump.streaming.{Processor, StreamApplication}
+import org.apache.gearpump.util.{AkkaApp, Graph}
+
+/**
+ *
+ */
+object WordCount extends AkkaApp with ArgumentsParser {
+
+  override val options: Array[(String, CLIOption[Any])] = Array.empty
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+val context = ClientContext(akkaConf)
+
+val hadoopConfig = new Configuration
+val checkpointStoreFactory = new 
HadoopCheckpointStoreFactory("MessageConsume", hadoopConfig,
+  // Rotates on 1MB
+  new FileSizeRotation(100))
+val taskConfig = UserConfig.empty
+  .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
+  .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L)
+  .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY,
+checkpointStoreFactory)(context.system)
+
+val app = StreamApp("dsl", context, taskConfig)
+val data = "This is a good start, bingo!! bingo!!"
+app.source(data.lines.toList, 1, "source").
+  // word => (word, count)
+  flatMapWithState(new StatefulFlatMapFunction(), "a stateful flatmap 
udf").
+  mapWithState(new StatefulMapFunction(), "").
+  // (word, count1), (word, count2) => (word, count1 + count2)
+  groupByKey().sum.log
+
+context.submit(app).waitUntilFinish()
+context.close()
+  }
+
+
+  private class StatefulFlatMapFunction
+extends FlatMapWithStateFunction[String, String] {
+
+private val logger: Log = LogFactory.getLog(getClass)
+
+private implicit val counterStateTag = "tag1"
+
+private var stateInternals: Option[StateInternals] = None
--- End diff --

"*Internals" should not be exposed to users. If it is a user facing 
interface, please rename it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #198: [GEARPUMP-311] refactor state management

2017-07-27 Thread manuzhang
Github user manuzhang commented on the issue:

https://github.com/apache/incubator-gearpump/pull/198
  
@yanghua @huafengw 
I pushed the changes to the `state` branch such that we can collaborate on 
this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #198: [GEARPUMP-311] refactor state manageme...

2017-07-27 Thread manuzhang
Github user manuzhang commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/198#discussion_r129997142
  
--- Diff: 
examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/refactor/WordCount.scala
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.wordcount.dsl.refactor
+
+import org.apache.commons.logging.{Log, LogFactory}
+import org.apache.gearpump.streaming.refactor.coder.{StringUtf8Coder, 
VarLongCoder}
+import 
org.apache.gearpump.streaming.refactor.dsl.api.functions.MapWithStateFunction
+import org.apache.gearpump.streaming.refactor.dsl.scalaapi.StreamApp
+import 
org.apache.gearpump.streaming.refactor.dsl.scalaapi.functions.FlatMapWithStateFunction
+import org.apache.gearpump.streaming.refactor.state.api.{StateInternals, 
ValueState}
+import org.apache.gearpump.streaming.refactor.state.{RuntimeContext, 
StateNamespaces, StateTags}
+import org.apache.hadoop.conf.Configuration
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
+import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory
+import org.apache.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation
+import org.apache.gearpump.streaming.state.impl.PersistentStateConfig
+import org.apache.gearpump.streaming.{Processor, StreamApplication}
+import org.apache.gearpump.util.{AkkaApp, Graph}
+
+/**
+ *
+ */
+object WordCount extends AkkaApp with ArgumentsParser {
+
+  override val options: Array[(String, CLIOption[Any])] = Array.empty
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+val context = ClientContext(akkaConf)
+
+val hadoopConfig = new Configuration
+val checkpointStoreFactory = new 
HadoopCheckpointStoreFactory("MessageConsume", hadoopConfig,
+  // Rotates on 1MB
+  new FileSizeRotation(100))
+val taskConfig = UserConfig.empty
+  .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
+  .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L)
+  .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY,
+checkpointStoreFactory)(context.system)
+
+val app = StreamApp("dsl", context, taskConfig)
+val data = "This is a good start, bingo!! bingo!!"
+app.source(data.lines.toList, 1, "source").
+  // word => (word, count)
+  flatMapWithState(new StatefulFlatMapFunction(), "a stateful flatmap 
udf").
+  mapWithState(new StatefulMapFunction(), "").
+  // (word, count1), (word, count2) => (word, count1 + count2)
+  groupByKey().sum.log
+
+context.submit(app).waitUntilFinish()
+context.close()
+  }
+
+
+  private class StatefulFlatMapFunction
+extends FlatMapWithStateFunction[String, String] {
+
+private val logger: Log = LogFactory.getLog(getClass)
+
+private implicit val counterStateTag = "tag1"
--- End diff --

We'd better refrain us from using Scala implicit in user land and provide 
an explicit interface instead


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #198: [GEARPUMP-311] refactor state manageme...

2017-07-27 Thread manuzhang
Github user manuzhang commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/198#discussion_r129996446
  
--- Diff: 
examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/refactor/ProduceProcessor.scala
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state.refactor
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.refactor.state.{RuntimeContext, 
StatefulTask}
+import org.apache.gearpump.streaming.task.TaskContext
+
+/**
+ *  a produce processor for generating a specific num sequence
+ */
+class ProduceProcessor(taskContext: TaskContext, conf: UserConfig)
--- End diff --

It seems this class is not used


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #198: [GEARPUMP-311] refactor state management

2017-07-27 Thread manuzhang
Github user manuzhang commented on the issue:

https://github.com/apache/incubator-gearpump/pull/198
  
@yanghua Thanks, this is a great effort. I left several comments after a 
quick glance through. 

More generally,

1. This is a big PR with new interfaces being introduced. Could we split it 
into smaller ones ?
2. I don't see the `WordCount` examples a strong use case for `State`.  I'm 
thinking about questions like "what is the state used for ?", "can it be 
replaced with a stateless one ?"
3. I see some existing classes copied under `refactor`. Can you modify the 
existing codes directly such that review will easier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #198: [GEARPUMP-311] refactor state manageme...

2017-07-27 Thread manuzhang
Github user manuzhang commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/198#discussion_r129996504
  
--- Diff: 
examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/refactor/SumProcessor.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state.refactor
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.refactor.coder._
+import org.apache.gearpump.streaming.refactor.state.api.{StateInternals, 
ValueState}
+import org.apache.gearpump.streaming.refactor.state.{RuntimeContext, 
StateNamespaces, StateTags, StatefulTask}
+import org.apache.gearpump.streaming.task.TaskContext
+
+/**
+ *  a sum processor for continues sum message from kafka
+ *  it is a example for using state api and verifying state exactly-once 
guarantee
+ */
+class SumProcessor(taskContext: TaskContext, conf: UserConfig)
--- End diff --

Is this used ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (GEARPUMP-311) refactor state management

2017-07-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/GEARPUMP-311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16104345#comment-16104345
 ] 

ASF GitHub Bot commented on GEARPUMP-311:
-

Github user manuzhang commented on the issue:

https://github.com/apache/incubator-gearpump/pull/198
  
@yanghua @huafengw 
I pushed the changes to the `state` branch such that we can collaborate on 
this.


> refactor state management
> -
>
> Key: GEARPUMP-311
> URL: https://issues.apache.org/jira/browse/GEARPUMP-311
> Project: Apache Gearpump
>  Issue Type: Improvement
>  Components: streaming
>Reporter: yanghua
>Assignee: yanghua
>Priority: Minor
>
> inspired by Apache Beam



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (GEARPUMP-311) refactor state management

2017-07-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/GEARPUMP-311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16104300#comment-16104300
 ] 

ASF GitHub Bot commented on GEARPUMP-311:
-

Github user manuzhang commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/198#discussion_r129996891
  
--- Diff: 
examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/refactor/WordCount.scala
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.wordcount.dsl.refactor
+
+import org.apache.commons.logging.{Log, LogFactory}
+import org.apache.gearpump.streaming.refactor.coder.{StringUtf8Coder, 
VarLongCoder}
+import 
org.apache.gearpump.streaming.refactor.dsl.api.functions.MapWithStateFunction
+import org.apache.gearpump.streaming.refactor.dsl.scalaapi.StreamApp
+import 
org.apache.gearpump.streaming.refactor.dsl.scalaapi.functions.FlatMapWithStateFunction
+import org.apache.gearpump.streaming.refactor.state.api.{StateInternals, 
ValueState}
+import org.apache.gearpump.streaming.refactor.state.{RuntimeContext, 
StateNamespaces, StateTags}
+import org.apache.hadoop.conf.Configuration
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
+import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory
+import org.apache.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation
+import org.apache.gearpump.streaming.state.impl.PersistentStateConfig
+import org.apache.gearpump.streaming.{Processor, StreamApplication}
+import org.apache.gearpump.util.{AkkaApp, Graph}
+
+/**
+ *
+ */
+object WordCount extends AkkaApp with ArgumentsParser {
+
+  override val options: Array[(String, CLIOption[Any])] = Array.empty
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+val context = ClientContext(akkaConf)
+
+val hadoopConfig = new Configuration
+val checkpointStoreFactory = new 
HadoopCheckpointStoreFactory("MessageConsume", hadoopConfig,
+  // Rotates on 1MB
+  new FileSizeRotation(100))
+val taskConfig = UserConfig.empty
+  .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
+  .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L)
+  .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY,
+checkpointStoreFactory)(context.system)
+
+val app = StreamApp("dsl", context, taskConfig)
+val data = "This is a good start, bingo!! bingo!!"
+app.source(data.lines.toList, 1, "source").
+  // word => (word, count)
+  flatMapWithState(new StatefulFlatMapFunction(), "a stateful flatmap 
udf").
+  mapWithState(new StatefulMapFunction(), "").
+  // (word, count1), (word, count2) => (word, count1 + count2)
+  groupByKey().sum.log
+
+context.submit(app).waitUntilFinish()
+context.close()
+  }
+
+
+  private class StatefulFlatMapFunction
+extends FlatMapWithStateFunction[String, String] {
+
+private val logger: Log = LogFactory.getLog(getClass)
+
+private implicit val counterStateTag = "tag1"
+
+private var stateInternals: Option[StateInternals] = None
--- End diff --

"*Internals" should not be exposed to users. If it is a user facing 
interface, please rename it. 


> refactor state management
> -
>
> Key: GEARPUMP-311
> URL: https://issues.apache.org/jira/browse/GEARPUMP-311
> Project: Apache Gearpump
>  Issue Type: Improvement
>  Components: streaming
>Reporter: yanghua
>Assignee: yanghua
>Priority: Minor
>
> inspired by Apache Beam




[jira] [Commented] (GEARPUMP-311) refactor state management

2017-07-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/GEARPUMP-311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16104297#comment-16104297
 ] 

ASF GitHub Bot commented on GEARPUMP-311:
-

Github user manuzhang commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/198#discussion_r129996504
  
--- Diff: 
examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/refactor/SumProcessor.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state.refactor
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.refactor.coder._
+import org.apache.gearpump.streaming.refactor.state.api.{StateInternals, 
ValueState}
+import org.apache.gearpump.streaming.refactor.state.{RuntimeContext, 
StateNamespaces, StateTags, StatefulTask}
+import org.apache.gearpump.streaming.task.TaskContext
+
+/**
+ *  a sum processor for continues sum message from kafka
+ *  it is a example for using state api and verifying state exactly-once 
guarantee
+ */
+class SumProcessor(taskContext: TaskContext, conf: UserConfig)
--- End diff --

Is this used ?


> refactor state management
> -
>
> Key: GEARPUMP-311
> URL: https://issues.apache.org/jira/browse/GEARPUMP-311
> Project: Apache Gearpump
>  Issue Type: Improvement
>  Components: streaming
>Reporter: yanghua
>Assignee: yanghua
>Priority: Minor
>
> inspired by Apache Beam



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (GEARPUMP-311) refactor state management

2017-07-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/GEARPUMP-311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16104299#comment-16104299
 ] 

ASF GitHub Bot commented on GEARPUMP-311:
-

Github user manuzhang commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/198#discussion_r129997628
  
--- Diff: 
examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/refactor/WordCount.scala
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.wordcount.dsl.refactor
+
+import org.apache.commons.logging.{Log, LogFactory}
+import org.apache.gearpump.streaming.refactor.coder.{StringUtf8Coder, 
VarLongCoder}
+import 
org.apache.gearpump.streaming.refactor.dsl.api.functions.MapWithStateFunction
+import org.apache.gearpump.streaming.refactor.dsl.scalaapi.StreamApp
+import 
org.apache.gearpump.streaming.refactor.dsl.scalaapi.functions.FlatMapWithStateFunction
+import org.apache.gearpump.streaming.refactor.state.api.{StateInternals, 
ValueState}
+import org.apache.gearpump.streaming.refactor.state.{RuntimeContext, 
StateNamespaces, StateTags}
+import org.apache.hadoop.conf.Configuration
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
+import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory
+import org.apache.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation
+import org.apache.gearpump.streaming.state.impl.PersistentStateConfig
+import org.apache.gearpump.streaming.{Processor, StreamApplication}
+import org.apache.gearpump.util.{AkkaApp, Graph}
+
+/**
+ *
+ */
+object WordCount extends AkkaApp with ArgumentsParser {
+
+  override val options: Array[(String, CLIOption[Any])] = Array.empty
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+val context = ClientContext(akkaConf)
+
+val hadoopConfig = new Configuration
+val checkpointStoreFactory = new 
HadoopCheckpointStoreFactory("MessageConsume", hadoopConfig,
+  // Rotates on 1MB
+  new FileSizeRotation(100))
+val taskConfig = UserConfig.empty
+  .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
+  .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L)
+  .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY,
+checkpointStoreFactory)(context.system)
+
+val app = StreamApp("dsl", context, taskConfig)
+val data = "This is a good start, bingo!! bingo!!"
+app.source(data.lines.toList, 1, "source").
+  // word => (word, count)
+  flatMapWithState(new StatefulFlatMapFunction(), "a stateful flatmap 
udf").
+  mapWithState(new StatefulMapFunction(), "").
+  // (word, count1), (word, count2) => (word, count1 + count2)
+  groupByKey().sum.log
+
+context.submit(app).waitUntilFinish()
+context.close()
+  }
+
+
+  private class StatefulFlatMapFunction
+extends FlatMapWithStateFunction[String, String] {
+
+private val logger: Log = LogFactory.getLog(getClass)
+
+private implicit val counterStateTag = "tag1"
+
+private var stateInternals: Option[StateInternals] = None
+private var counterState: Option[ValueState[java.lang.Long]] = None
--- End diff --

what is the `counterState` used for ? 


> refactor state management
> -
>
> Key: GEARPUMP-311
> URL: https://issues.apache.org/jira/browse/GEARPUMP-311
> Project: Apache Gearpump
>  Issue Type: Improvement
>  Components: streaming
>Reporter: yanghua
>Assignee: yanghua
>Priority: Minor
>
> inspired 

[GitHub] incubator-gearpump issue #198: [GEARPUMP-311] refactor state management

2017-07-27 Thread manuzhang
Github user manuzhang commented on the issue:

https://github.com/apache/incubator-gearpump/pull/198
  
R: @manuzhang @huafengw 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (GEARPUMP-311) refactor state management

2017-07-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/GEARPUMP-311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16104215#comment-16104215
 ] 

ASF GitHub Bot commented on GEARPUMP-311:
-

Github user manuzhang commented on the issue:

https://github.com/apache/incubator-gearpump/pull/198
  
R: @manuzhang @huafengw 


> refactor state management
> -
>
> Key: GEARPUMP-311
> URL: https://issues.apache.org/jira/browse/GEARPUMP-311
> Project: Apache Gearpump
>  Issue Type: Improvement
>  Components: streaming
>Reporter: yanghua
>Assignee: yanghua
>Priority: Minor
>
> inspired by Apache Beam



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] incubator-gearpump pull request #202: [GEARPUMP-331] Allow applications can ...

2017-07-27 Thread huafengw
GitHub user huafengw opened a pull request:

https://github.com/apache/incubator-gearpump/pull/202

[GEARPUMP-331] Allow applications can be ran in IDE

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the commit message is formatted like:
   `[GEARPUMP-] Meaningful description of pull request` 
 - [ ] Make sure tests pass via `sbt clean test`.
 - [ ] Make sure old documentation affected by the pull request has been 
updated and new documentation added for new functionality. 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huafengw/incubator-gearpump GP331

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-gearpump/pull/202.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #202


commit c95a826fa24375cafeac009b1388c8f5ab3d3792
Author: huafengw 
Date:   2017-07-27T05:09:28Z

[GEARPUMP-331] Allow applications can be ran in IDE




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #202: [GEARPUMP-331] Allow applications to b...

2017-07-27 Thread manuzhang
Github user manuzhang commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/202#discussion_r129773363
  
--- Diff: docs/contents/dev/dev-write-1st-app.md ---
@@ -9,7 +9,7 @@ We'll use the classical 
[wordcount](https://github.com/apache/incubator-gearpump
  override val options: Array[(String, CLIOption[Any])] = Array.empty

  override def main(akkaConf: Config, args: Array[String]): Unit = {
-   val context = ClientContext(akkaConf)
+   val context = RuntimeEnvironment.get().newClientContext(akkaConf)
--- End diff --

Can we hide `RuntimeEnvironment.get()` in `ClientContext` since users don't 
need to know ? Another benefit is there will be no changes to existing 
applications.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #202: [GEARPUMP-331] Allow applications to b...

2017-07-27 Thread manuzhang
Github user manuzhang commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/202#discussion_r129772729
  
--- Diff: 
examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
 ---
@@ -64,18 +64,13 @@ object WordCount extends AkkaApp with ArgumentsParser {
 val debugMode = config.getBoolean("debug")
 val sleepSeconds = config.getInt("sleep")
 
-val localCluster = if (debugMode) {
-  val cluster = new EmbeddedCluster(akkaConf: Config)
-  cluster.start()
-  Some(cluster)
+val runtimeEnv = if (debugMode) {
--- End diff --

Is the "debugMode" option necessary ?  If examples are run in IDE, they are 
already in debug mode.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #202: [GEARPUMP-331] Allow applications to be run i...

2017-07-27 Thread codecov-io
Github user codecov-io commented on the issue:

https://github.com/apache/incubator-gearpump/pull/202
  
# 
[Codecov](https://codecov.io/gh/apache/incubator-gearpump/pull/202?src=pr=h1)
 Report
> Merging 
[#202](https://codecov.io/gh/apache/incubator-gearpump/pull/202?src=pr=desc) 
into 
[master](https://codecov.io/gh/apache/incubator-gearpump/commit/ac8ac039217725f9f924ec901fdf7adfd9f51657?src=pr=desc)
 will **decrease** coverage by `<.01%`.
> The diff coverage is `21.42%`.



```diff
@@Coverage Diff @@
##   master #202  +/-   ##
==
- Coverage   71.72%   71.72%   -0.01% 
==
  Files 189  191   +2 
  Lines6104 6107   +3 
  Branches  542  537   -5 
==
+ Hits 4378 4380   +2 
- Misses   1726 1727   +1
```




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #202: [GEARPUMP-331] Allow applications to b...

2017-07-27 Thread manuzhang
Github user manuzhang commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/202#discussion_r129772131
  
--- Diff: 
core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala 
---
@@ -36,27 +36,21 @@ import org.apache.gearpump.util.{LogUtil, Util}
  * Create a in-process cluster with single worker
  */
 class EmbeddedCluster(inputConfig: Config) {
-
-  private val workerCount: Int = 1
-  private var _master: ActorRef = null
-  private var _system: ActorSystem = null
-  private var _config: Config = null
-
   private val LOG = LogUtil.getLogger(getClass)
+  private val workerCount: Int = 1
+  var _master: ActorRef = _
--- End diff --

Do we have to expose these mutable variables ?  BTW, how about removing the 
prefix "_" since I don't see any conflicts here ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (GEARPUMP-331) Allow examples to be ran in IDE

2017-07-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/GEARPUMP-331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16102874#comment-16102874
 ] 

ASF GitHub Bot commented on GEARPUMP-331:
-

Github user manuzhang commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/202#discussion_r129773363
  
--- Diff: docs/contents/dev/dev-write-1st-app.md ---
@@ -9,7 +9,7 @@ We'll use the classical 
[wordcount](https://github.com/apache/incubator-gearpump
  override val options: Array[(String, CLIOption[Any])] = Array.empty

  override def main(akkaConf: Config, args: Array[String]): Unit = {
-   val context = ClientContext(akkaConf)
+   val context = RuntimeEnvironment.get().newClientContext(akkaConf)
--- End diff --

Can we hide `RuntimeEnvironment.get()` in `ClientContext` since users don't 
need to know ? Another benefit is there will be no changes to existing 
applications.


> Allow examples to be ran in IDE
> ---
>
> Key: GEARPUMP-331
> URL: https://issues.apache.org/jira/browse/GEARPUMP-331
> Project: Apache Gearpump
>  Issue Type: Improvement
>Reporter: Huafeng Wang
>Assignee: Huafeng Wang
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (GEARPUMP-331) Allow examples to be ran in IDE

2017-07-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/GEARPUMP-331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16102867#comment-16102867
 ] 

ASF GitHub Bot commented on GEARPUMP-331:
-

Github user manuzhang commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/202#discussion_r129772729
  
--- Diff: 
examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
 ---
@@ -64,18 +64,13 @@ object WordCount extends AkkaApp with ArgumentsParser {
 val debugMode = config.getBoolean("debug")
 val sleepSeconds = config.getInt("sleep")
 
-val localCluster = if (debugMode) {
-  val cluster = new EmbeddedCluster(akkaConf: Config)
-  cluster.start()
-  Some(cluster)
+val runtimeEnv = if (debugMode) {
--- End diff --

Is the "debugMode" option necessary ?  If examples are run in IDE, they are 
already in debug mode.


> Allow examples to be ran in IDE
> ---
>
> Key: GEARPUMP-331
> URL: https://issues.apache.org/jira/browse/GEARPUMP-331
> Project: Apache Gearpump
>  Issue Type: Improvement
>Reporter: Huafeng Wang
>Assignee: Huafeng Wang
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (GEARPUMP-331) Allow examples to be ran in IDE

2017-07-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/GEARPUMP-331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16102858#comment-16102858
 ] 

ASF GitHub Bot commented on GEARPUMP-331:
-

Github user manuzhang commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/202#discussion_r129772131
  
--- Diff: 
core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala 
---
@@ -36,27 +36,21 @@ import org.apache.gearpump.util.{LogUtil, Util}
  * Create a in-process cluster with single worker
  */
 class EmbeddedCluster(inputConfig: Config) {
-
-  private val workerCount: Int = 1
-  private var _master: ActorRef = null
-  private var _system: ActorSystem = null
-  private var _config: Config = null
-
   private val LOG = LogUtil.getLogger(getClass)
+  private val workerCount: Int = 1
+  var _master: ActorRef = _
--- End diff --

Do we have to expose these mutable variables ?  BTW, how about removing the 
prefix "_" since I don't see any conflicts here ?


> Allow examples to be ran in IDE
> ---
>
> Key: GEARPUMP-331
> URL: https://issues.apache.org/jira/browse/GEARPUMP-331
> Project: Apache Gearpump
>  Issue Type: Improvement
>Reporter: Huafeng Wang
>Assignee: Huafeng Wang
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (GEARPUMP-332) Split KafkaConfig into KafkaSourceConfig and KafkaSourceConfig

2017-07-27 Thread darion yaphet (JIRA)
darion yaphet created GEARPUMP-332:
--

 Summary: Split KafkaConfig into KafkaSourceConfig and 
KafkaSourceConfig
 Key: GEARPUMP-332
 URL: https://issues.apache.org/jira/browse/GEARPUMP-332
 Project: Apache Gearpump
  Issue Type: Improvement
  Components: kafka
Affects Versions: 0.8.4
Reporter: darion yaphet
Assignee: darion yaphet
Priority: Minor
 Fix For: 0.8.5


*KafkaConfig*  include both Kafka Producer and Consumer's config parameter , we 
can split it into *KafkaSourceConfig* and *KafkaSourceConfig* to control 
KafkaSource and KafkaSink behaviour . 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (GEARPUMP-331) Allow examples to be ran in IDE

2017-07-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/GEARPUMP-331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16102780#comment-16102780
 ] 

ASF GitHub Bot commented on GEARPUMP-331:
-

Github user codecov-io commented on the issue:

https://github.com/apache/incubator-gearpump/pull/202
  
# 
[Codecov](https://codecov.io/gh/apache/incubator-gearpump/pull/202?src=pr=h1)
 Report
> Merging 
[#202](https://codecov.io/gh/apache/incubator-gearpump/pull/202?src=pr=desc) 
into 
[master](https://codecov.io/gh/apache/incubator-gearpump/commit/ac8ac039217725f9f924ec901fdf7adfd9f51657?src=pr=desc)
 will **decrease** coverage by `<.01%`.
> The diff coverage is `21.42%`.



```diff
@@Coverage Diff @@
##   master #202  +/-   ##
==
- Coverage   71.72%   71.72%   -0.01% 
==
  Files 189  191   +2 
  Lines6104 6107   +3 
  Branches  542  537   -5 
==
+ Hits 4378 4380   +2 
- Misses   1726 1727   +1
```




> Allow examples to be ran in IDE
> ---
>
> Key: GEARPUMP-331
> URL: https://issues.apache.org/jira/browse/GEARPUMP-331
> Project: Apache Gearpump
>  Issue Type: Improvement
>Reporter: Huafeng Wang
>Assignee: Huafeng Wang
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)