[GitHub] apex-malhar pull request #292: APEXMALHAR-1957: Updated HBase Input Operator

2016-06-13 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/292#discussion_r66887202
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java 
---
@@ -37,20 +37,21 @@
 import com.datatorrent.api.Context.OperatorContext;
 
 /**
+ * HBasePOJOInputOperator reads data from a HBase store, converts it to a 
POJO and puts it on the output port.
+ * The read from HBase is asynchronous.
  * @displayName HBase Input Operator
  * @category Input
  * @tags database, nosql, pojo, hbase
  * @since 3.1.0
  */
 @Evolving
-public class HBasePOJOInputOperator extends HBaseInputOperator
+public class HBasePOJOInputOperator extends HBaseScanOperator
 {
   private TableInfo tableInfo;
-  protected HBaseStore store;
   private String pojoTypeName;
-  private String startRow;
-  private String lastReadRow;
+  private Scan scan;
 
+  // Transients
   protected transient Class pojoType;
--- End diff --

Can you make pojoType, fieldValueGenerator & valueConverter private?
I don't see this being used outside of this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #292: APEXMALHAR-1957: Updated HBase Input Operator

2016-06-13 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/292#discussion_r66886863
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java 
---
@@ -69,116 +70,105 @@ public Object convertValue( HBaseFieldInfo fieldInfo, 
Object value)
   public void setup(OperatorContext context)
   {
 try {
-  store.connect();
+  super.setup(context);
   pojoType = Class.forName(pojoTypeName);
   pojoType.newInstance();   //try create new instance to verify the 
class.
   rowSetter = PojoUtils.createSetter(pojoType, 
tableInfo.getRowOrIdExpression(), String.class);
-  fieldValueGenerator = 
FieldValueGenerator.getFieldValueGenerator(pojoType, tableInfo.getFieldsInfo() 
);
+  fieldValueGenerator = 
HBaseFieldValueGenerator.getHBaseFieldValueGenerator(pojoType, 
tableInfo.getFieldsInfo() );
--- End diff --

You could just use the constructor of HBaseFieldValueGenerator... Do you 
see any advantage is having another public static entry point for generating 
this object? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-1957) Improve HBasePOJOInputOperator with support for threaded read

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328593#comment-15328593
 ] 

ASF GitHub Bot commented on APEXMALHAR-1957:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/292#discussion_r66886863
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java 
---
@@ -69,116 +70,105 @@ public Object convertValue( HBaseFieldInfo fieldInfo, 
Object value)
   public void setup(OperatorContext context)
   {
 try {
-  store.connect();
+  super.setup(context);
   pojoType = Class.forName(pojoTypeName);
   pojoType.newInstance();   //try create new instance to verify the 
class.
   rowSetter = PojoUtils.createSetter(pojoType, 
tableInfo.getRowOrIdExpression(), String.class);
-  fieldValueGenerator = 
FieldValueGenerator.getFieldValueGenerator(pojoType, tableInfo.getFieldsInfo() 
);
+  fieldValueGenerator = 
HBaseFieldValueGenerator.getHBaseFieldValueGenerator(pojoType, 
tableInfo.getFieldsInfo() );
--- End diff --

You could just use the constructor of HBaseFieldValueGenerator... Do you 
see any advantage is having another public static entry point for generating 
this object? 


> Improve HBasePOJOInputOperator with support for threaded read
> -
>
> Key: APEXMALHAR-1957
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1957
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Bhupesh Chawda
>Assignee: Bhupesh Chawda
>
> Add the following support to Hbase POJO Input Operator:
> * Add support for threaded read
> * Allow to specify a set of "column family: column" and fetch data only for 
> these columns
> * Allow to specify an end row key to stop scanning
> * Add metrics



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #292: APEXMALHAR-1957: Updated HBase Input Operator

2016-06-13 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/292#discussion_r66885832
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseScanOperator.java ---
@@ -40,27 +51,107 @@
  * @tags hbase, scan, input operator
  * @since 0.3.2
  */
-public abstract class HBaseScanOperator extends HBaseInputOperator
+public abstract class HBaseScanOperator extends HBaseInputOperator 
implements Operator.ActivationListener
 {
+  public static final int DEF_HINT_SCAN_LOOKAHEAD = 2;
+  public static final int DEF_QUEUE_SIZE = 1000;
+  public static final int DEF_SLEEP_MILLIS = 10;
+
+  private String startRow;
+  private String endRow;
+  private String lastReadRow;
+  private int hintScanLookahead = DEF_HINT_SCAN_LOOKAHEAD;
+  private int queueSize = DEF_QUEUE_SIZE;
+  private int sleepMillis = DEF_SLEEP_MILLIS;
+  private Queue resultQueue;
+  private boolean threadFailed = false;
+
+  @AutoMetric
+  protected long tuplesRead;
+
+  // Transients
+  protected transient Scan scan;
+  protected transient ResultScanner scanner;
+  protected transient Thread readThread;
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+super.setup(context);
+resultQueue = Queues.newLinkedBlockingQueue(queueSize);
+  }
+
+  @Override
+  public void activate(Context context)
+  {
+startReadThread();
+  }
+
+  protected void startReadThread()
+  {
+try {
+  scan = operationScan();
+  scanner = getStore().getTable().getScanner(scan);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+readThread = new Thread(new Runnable() {
+  @Override
+  public void run()
+  {
+try {
+  Result result;
+  while ((result = scanner.next()) != null) {
+while (!resultQueue.offer(result)) {
+  Thread.sleep(sleepMillis);
+}
+  }
+} catch (Exception e) {
+  logger.debug("Exception in fetching results {}", e.getMessage());
+  threadFailed = true;
+  throw new RuntimeException(e);
+} finally {
+  scanner.close();
+}
+  }
+});
+readThread.start();
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+super.beginWindow(windowId);
+tuplesRead = 0;
+  }
 
   @Override
   public void emitTuples()
   {
+if (!readThread.isAlive() && threadFailed) {
+  throw new RuntimeException("Exception in scan thread");
--- End diff --

Should we not ideally restart the thread and let the operator continue 
running?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-1957) Improve HBasePOJOInputOperator with support for threaded read

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328590#comment-15328590
 ] 

ASF GitHub Bot commented on APEXMALHAR-1957:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/292#discussion_r66886741
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java 
---
@@ -69,116 +70,105 @@ public Object convertValue( HBaseFieldInfo fieldInfo, 
Object value)
   public void setup(OperatorContext context)
   {
 try {
-  store.connect();
+  super.setup(context);
   pojoType = Class.forName(pojoTypeName);
--- End diff --

Yes please.


> Improve HBasePOJOInputOperator with support for threaded read
> -
>
> Key: APEXMALHAR-1957
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1957
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Bhupesh Chawda
>Assignee: Bhupesh Chawda
>
> Add the following support to Hbase POJO Input Operator:
> * Add support for threaded read
> * Allow to specify a set of "column family: column" and fetch data only for 
> these columns
> * Allow to specify an end row key to stop scanning
> * Add metrics



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-1957) Improve HBasePOJOInputOperator with support for threaded read

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328481#comment-15328481
 ] 

ASF GitHub Bot commented on APEXMALHAR-1957:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/292#discussion_r66884332
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldValueGenerator.java
 ---
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.hbase;
+
+import java.util.List;
+
+import com.datatorrent.lib.util.FieldValueGenerator;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * A {@link FieldValueGenerator} implementation for {@link HBaseFieldInfo}
+ */
+public class HBaseFieldValueGenerator extends 
FieldValueGenerator
+{
+  public static final String COLON = ":";
+
+  public static HBaseFieldValueGenerator getHBaseFieldValueGenerator(final 
Class clazz, List
+  fieldInfos)
+  {
+return new HBaseFieldValueGenerator(clazz, fieldInfos);
+  }
+
+  @SuppressWarnings("unchecked")
+  protected HBaseFieldValueGenerator(final Class clazz, 
List fieldInfos)
+  {
+for (HBaseFieldInfo fieldInfo : fieldInfos) {
+  fieldInfoMap.put(fieldInfo.getFamilyName() + COLON + 
fieldInfo.getColumnName(), fieldInfo);
+
+  PojoUtils.Getter getter =
+  PojoUtils.createGetter(clazz, 
fieldInfo.getPojoFieldExpression(), fieldInfo.getType().getJavaType());
+  fieldGetterMap.put(fieldInfo, getter);
+}
+
+for (HBaseFieldInfo fieldInfo : fieldInfos) {
+  PojoUtils.Setter setter =
+  PojoUtils.createSetter(clazz, 
fieldInfo.getPojoFieldExpression(), fieldInfo.getType().getJavaType());
+  fieldSetterMap.put(fieldInfo, setter);
--- End diff --

Would it be more efficient if the key of fieldSetterMap is made as 
familyName:columnName instead of any object?


> Improve HBasePOJOInputOperator with support for threaded read
> -
>
> Key: APEXMALHAR-1957
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1957
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Bhupesh Chawda
>Assignee: Bhupesh Chawda
>
> Add the following support to Hbase POJO Input Operator:
> * Add support for threaded read
> * Allow to specify a set of "column family: column" and fetch data only for 
> these columns
> * Allow to specify an end row key to stop scanning
> * Add metrics



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #292: APEXMALHAR-1957: Updated HBase Input Operator

2016-06-13 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/292#discussion_r66886741
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java 
---
@@ -69,116 +70,105 @@ public Object convertValue( HBaseFieldInfo fieldInfo, 
Object value)
   public void setup(OperatorContext context)
   {
 try {
-  store.connect();
+  super.setup(context);
   pojoType = Class.forName(pojoTypeName);
--- End diff --

Yes please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-1957) Improve HBasePOJOInputOperator with support for threaded read

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328582#comment-15328582
 ] 

ASF GitHub Bot commented on APEXMALHAR-1957:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/292#discussion_r66886409
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseScanOperator.java ---
@@ -79,4 +170,79 @@ public void emitTuples()
*/
   protected abstract T getTuple(Result result);
 
+  /**
+   * Returns the start row key in the table as set previously
+   * @return {@link #startRow}
+   */
+  public String getStartRow()
+  {
+return startRow;
+  }
+
+  /**
+   * Sets the start row key in the table from where the scan should begin
+   * @param startRow
+   */
+  public void setStartRow(String startRow)
+  {
+this.startRow = startRow;
+  }
+
+  /**
+   * Returns the end row key in the table as set previously
+   * @return {@link #endRow}
+   */
+  public String getEndRow()
+  {
+return endRow;
+  }
+
+  /**
+   * Sets the end row key in the table where the scan should end
+   * @param endRow
+   */
+  public void setEndRow(String endRow)
+  {
+this.endRow = endRow;
+  }
+
+  /**
+   * Returns the last read row key from the hbase table
+   * @return {@link #lastReadRow}
+   */
+  public String getLastReadRow()
+  {
+return lastReadRow;
+  }
+
+  /**
+   * Sets the last read row key from the hbase table. After the failures, 
the new scan will start from this row key
+   * @param lastReadRow
+   */
+  public void setLastReadRow(String lastReadRow)
+  {
+this.lastReadRow = lastReadRow;
+  }
+
+  /**
+   * Returns the Scan HINT_LOOKAHEAD parameter as configured. Default is 
{@value #DEF_HINT_SCAN_LOOKAHEAD}
--- End diff --

Can you please check once that HINT_LOOKAHEAD feature is available in all 
HBase client and server versions?
If not, can you please describe this operator as works best for certain 
version and above for all other versions this field will be ignored?


> Improve HBasePOJOInputOperator with support for threaded read
> -
>
> Key: APEXMALHAR-1957
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1957
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Bhupesh Chawda
>Assignee: Bhupesh Chawda
>
> Add the following support to Hbase POJO Input Operator:
> * Add support for threaded read
> * Allow to specify a set of "column family: column" and fetch data only for 
> these columns
> * Allow to specify an end row key to stop scanning
> * Add metrics



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #292: APEXMALHAR-1957: Updated HBase Input Operator

2016-06-13 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/292#discussion_r66885954
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseScanOperator.java ---
@@ -40,27 +51,107 @@
  * @tags hbase, scan, input operator
  * @since 0.3.2
  */
-public abstract class HBaseScanOperator extends HBaseInputOperator
+public abstract class HBaseScanOperator extends HBaseInputOperator 
implements Operator.ActivationListener
 {
+  public static final int DEF_HINT_SCAN_LOOKAHEAD = 2;
+  public static final int DEF_QUEUE_SIZE = 1000;
+  public static final int DEF_SLEEP_MILLIS = 10;
+
+  private String startRow;
+  private String endRow;
+  private String lastReadRow;
+  private int hintScanLookahead = DEF_HINT_SCAN_LOOKAHEAD;
+  private int queueSize = DEF_QUEUE_SIZE;
+  private int sleepMillis = DEF_SLEEP_MILLIS;
+  private Queue resultQueue;
+  private boolean threadFailed = false;
+
+  @AutoMetric
+  protected long tuplesRead;
+
+  // Transients
+  protected transient Scan scan;
+  protected transient ResultScanner scanner;
+  protected transient Thread readThread;
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+super.setup(context);
+resultQueue = Queues.newLinkedBlockingQueue(queueSize);
+  }
+
+  @Override
+  public void activate(Context context)
+  {
+startReadThread();
+  }
+
+  protected void startReadThread()
+  {
+try {
+  scan = operationScan();
+  scanner = getStore().getTable().getScanner(scan);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+readThread = new Thread(new Runnable() {
+  @Override
+  public void run()
+  {
+try {
+  Result result;
+  while ((result = scanner.next()) != null) {
+while (!resultQueue.offer(result)) {
+  Thread.sleep(sleepMillis);
+}
+  }
+} catch (Exception e) {
+  logger.debug("Exception in fetching results {}", e.getMessage());
+  threadFailed = true;
+  throw new RuntimeException(e);
+} finally {
+  scanner.close();
+}
+  }
+});
+readThread.start();
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+super.beginWindow(windowId);
+tuplesRead = 0;
+  }
 
   @Override
   public void emitTuples()
   {
+if (!readThread.isAlive() && threadFailed) {
+  throw new RuntimeException("Exception in scan thread");
+}
 try {
-  HTable table = getTable();
-  Scan scan = operationScan();
-  ResultScanner scanner = table.getScanner(scan);
-  for (Result result : scanner) {
-//KeyValue[] kvs = result.raw();
-//T t = getTuple(kvs);
-T t = getTuple(result);
-outputPort.emit(t);
+  Result result = resultQueue.poll();
+  if (result == null) {
+return;
+  }
+  T tuple = getTuple(result);
+  if (tuple != null) {
+outputPort.emit(tuple);
+tuplesRead++;
   }
 } catch (Exception e) {
-  e.printStackTrace();
+  throw new RuntimeException(e);
 }
   }
 
+  @Override
+  public void deactivate()
+  {
+readThread.interrupt();
--- End diff --

Please don't interrupt I would suggest. Have a variable in thread which is 
set to true for running and false for existing. Then wait for the operation to 
finish.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #292: APEXMALHAR-1957: Updated HBase Input Operator

2016-06-13 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/292#discussion_r66884310
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldValueGenerator.java
 ---
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.hbase;
+
+import java.util.List;
+
+import com.datatorrent.lib.util.FieldValueGenerator;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * A {@link FieldValueGenerator} implementation for {@link HBaseFieldInfo}
+ */
+public class HBaseFieldValueGenerator extends 
FieldValueGenerator
+{
+  public static final String COLON = ":";
+
+  public static HBaseFieldValueGenerator getHBaseFieldValueGenerator(final 
Class clazz, List
+  fieldInfos)
+  {
+return new HBaseFieldValueGenerator(clazz, fieldInfos);
+  }
+
+  @SuppressWarnings("unchecked")
+  protected HBaseFieldValueGenerator(final Class clazz, 
List fieldInfos)
+  {
+for (HBaseFieldInfo fieldInfo : fieldInfos) {
+  fieldInfoMap.put(fieldInfo.getFamilyName() + COLON + 
fieldInfo.getColumnName(), fieldInfo);
+
+  PojoUtils.Getter getter =
+  PojoUtils.createGetter(clazz, 
fieldInfo.getPojoFieldExpression(), fieldInfo.getType().getJavaType());
+  fieldGetterMap.put(fieldInfo, getter);
--- End diff --

Would it be more efficient if the key of fieldGetterMap is made as 
familyName:columnName instead of any object?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #302: [APEXMALHAR-139] Make Sigma Operator Partitio...

2016-06-13 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/302#discussion_r66880779
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/math/AbstractOutput.java ---
@@ -38,23 +38,23 @@
* Double type output.
*/
   @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort doubleResult = new 
DefaultOutputPort();
+  public transient DefaultOutputPort doubleResult = new 
DefaultOutputPort();
--- End diff --

@ilganeli This won't work. The purpose of keeping port variable final is 
because this variable is used in Application.java to connect the ports. Once 
its used in addStream in application.java, the reference to this variable is 
kept and the same object will be used. Hence even if the object is recreated in 
child class, that won't take effect.

Suggested approach for adding unifier to super class's port is as follows:

1. Do following in parent class:
``
public final transient DefaultOutputPort doubleResult = new 
DefaultOutputPort<>()
{
   @Override
  public Operator.Unifier getUnifier()
  {
 return getUnifier();
  }
}

protected Operator.Unifier getUnifier()
{
   return null;
}
``
2. Do following in child class:
``
@Override
protected Operator.Unifier getUnifier()
{
return new UnifierSumNumber<>();
}
``


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-139) Make sigma operator partition-able

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328436#comment-15328436
 ] 

ASF GitHub Bot commented on APEXMALHAR-139:
---

Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/302#discussion_r66880779
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/math/AbstractOutput.java ---
@@ -38,23 +38,23 @@
* Double type output.
*/
   @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort doubleResult = new 
DefaultOutputPort();
+  public transient DefaultOutputPort doubleResult = new 
DefaultOutputPort();
--- End diff --

@ilganeli This won't work. The purpose of keeping port variable final is 
because this variable is used in Application.java to connect the ports. Once 
its used in addStream in application.java, the reference to this variable is 
kept and the same object will be used. Hence even if the object is recreated in 
child class, that won't take effect.

Suggested approach for adding unifier to super class's port is as follows:

1. Do following in parent class:
``
public final transient DefaultOutputPort doubleResult = new 
DefaultOutputPort<>()
{
   @Override
  public Operator.Unifier getUnifier()
  {
 return getUnifier();
  }
}

protected Operator.Unifier getUnifier()
{
   return null;
}
``
2. Do following in child class:
``
@Override
protected Operator.Unifier getUnifier()
{
return new UnifierSumNumber<>();
}
``


> Make sigma operator partition-able
> --
>
> Key: APEXMALHAR-139
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-139
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Chandni Singh
>
> Sigma operator in Malhar library doesn't seem to be Stateful as stated in the 
> java-docs.
> This operator can be partitioned by adding a Unifier to the output ports.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #317: APEXMALHAR-2117 Generate default queryStmt ev...

2016-06-13 Thread chinmaykolhatkar
GitHub user chinmaykolhatkar opened a pull request:

https://github.com/apache/apex-malhar/pull/317

APEXMALHAR-2117 Generate default queryStmt even if its set to empty string

Also added missing hsqldb mavne dependency for test scope.

@sandeepdeshmukh  Can you please review and merge?

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chinmaykolhatkar/apex-malhar APEXMALHAR-2117

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/317.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #317


commit 882e4568a0176723cdc31de4eef021f6376b2c6c
Author: Chinmay Kolhatkar 
Date:   2016-06-13T22:02:01Z

APEXMALHAR-2117 Generate default queryStmt even if its set to empty string.
Also added missing hsqldb mavne dependency for test scope.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (APEXMALHAR-2117) JDBCLoader for Enricher should generate default queryStmt even when value is empty string

2016-06-13 Thread Chinmay Kolhatkar (JIRA)
Chinmay Kolhatkar created APEXMALHAR-2117:
-

 Summary: JDBCLoader for Enricher should generate default queryStmt 
even when value is empty string 
 Key: APEXMALHAR-2117
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2117
 Project: Apache Apex Malhar
  Issue Type: Bug
  Components: utilities
Affects Versions: 3.4.0
Reporter: Chinmay Kolhatkar
Assignee: Chinmay Kolhatkar


Currently JDBCLoader creates a default queryStmt based on properties set only 
when the queryStmt is set to null.
It should do that same when its set to empty string.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (APEXMALHAR-2111) Projection Operator config params shall use List instead of comma-separated field names

2016-06-13 Thread Chinmay Kolhatkar (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chinmay Kolhatkar resolved APEXMALHAR-2111.
---
   Resolution: Done
Fix Version/s: 3.5.0

> Projection Operator config params shall use List instead of comma-separated 
> field names
> ---
>
> Key: APEXMALHAR-2111
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2111
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Pradeep A. Dalvi
>Assignee: Pradeep A. Dalvi
> Fix For: 3.5.0
>
>
> Projection Operator accepts 2 config params: selectFields & dropFields. 
> Currently both these parameters accept values in comma-separated field names 
> format. However this is not inline with other operators do to accept multiple 
> values, for which they use List instead.
> So this proposal is to make change in Projection Operator to accept config 
> params as List instead of comma-separated string. Hence selectFields & 
> dropFields shall accept List going forward.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2111) Projection Operator config params shall use List instead of comma-separated field names

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327897#comment-15327897
 ] 

ASF GitHub Bot commented on APEXMALHAR-2111:


Github user asfgit closed the pull request at:

https://github.com/apache/apex-malhar/pull/311


> Projection Operator config params shall use List instead of comma-separated 
> field names
> ---
>
> Key: APEXMALHAR-2111
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2111
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Pradeep A. Dalvi
>Assignee: Pradeep A. Dalvi
>
> Projection Operator accepts 2 config params: selectFields & dropFields. 
> Currently both these parameters accept values in comma-separated field names 
> format. However this is not inline with other operators do to accept multiple 
> values, for which they use List instead.
> So this proposal is to make change in Projection Operator to accept config 
> params as List instead of comma-separated string. Hence selectFields & 
> dropFields shall accept List going forward.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #311: APEXMALHAR-2111 Projection Operator config ch...

2016-06-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/apex-malhar/pull/311


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


new method emitMessage() in Kafka Operator

2016-06-13 Thread Chaitanya Chebolu
Hi All,

  I am proposing to add new API in 0.8 version of
AbstractKafkaInputOperator:
void emitMessage(KafkaMessage message).

  "message" has details like offset, kafkapartition, value of the message.

By adding this, users have more control over the message.  Callback of this
method would be in emitTuples() API.

   To maintain backward compatibility, definition of this new method as
below:

void emitMessage(KafkaMessage message)
{
emitTuple(message.msg);
}

   Please share your thoughts on this approach.

Regards,
Chaitanya