Github user chinmaykolhatkar commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/292#discussion_r66887202
--- Diff:
contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java
---
@@ -37,20 +37,21 @@
import com.datatorrent.api.Context.OperatorContext;
/**
+ * HBasePOJOInputOperator reads data from a HBase store, converts it to a
POJO and puts it on the output port.
+ * The read from HBase is asynchronous.
* @displayName HBase Input Operator
* @category Input
* @tags database, nosql, pojo, hbase
* @since 3.1.0
*/
@Evolving
-public class HBasePOJOInputOperator extends HBaseInputOperator
+public class HBasePOJOInputOperator extends HBaseScanOperator
{
private TableInfo tableInfo;
- protected HBaseStore store;
private String pojoTypeName;
- private String startRow;
- private String lastReadRow;
+ private Scan scan;
+ // Transients
protected transient Class pojoType;
--- End diff --
Can you make pojoType, fieldValueGenerator & valueConverter private?
I don't see this being used outside of this class.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
Github user chinmaykolhatkar commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/292#discussion_r66886863
--- Diff:
contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java
---
@@ -69,116 +70,105 @@ public Object convertValue( HBaseFieldInfo fieldInfo,
Object value)
public void setup(OperatorContext context)
{
try {
- store.connect();
+ super.setup(context);
pojoType = Class.forName(pojoTypeName);
pojoType.newInstance(); //try create new instance to verify the
class.
rowSetter = PojoUtils.createSetter(pojoType,
tableInfo.getRowOrIdExpression(), String.class);
- fieldValueGenerator =
FieldValueGenerator.getFieldValueGenerator(pojoType, tableInfo.getFieldsInfo()
);
+ fieldValueGenerator =
HBaseFieldValueGenerator.getHBaseFieldValueGenerator(pojoType,
tableInfo.getFieldsInfo() );
--- End diff --
You could just use the constructor of HBaseFieldValueGenerator... Do you
see any advantage is having another public static entry point for generating
this object?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/APEXMALHAR-1957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328593#comment-15328593
]
ASF GitHub Bot commented on APEXMALHAR-1957:
Github user chinmaykolhatkar commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/292#discussion_r66886863
--- Diff:
contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java
---
@@ -69,116 +70,105 @@ public Object convertValue( HBaseFieldInfo fieldInfo,
Object value)
public void setup(OperatorContext context)
{
try {
- store.connect();
+ super.setup(context);
pojoType = Class.forName(pojoTypeName);
pojoType.newInstance(); //try create new instance to verify the
class.
rowSetter = PojoUtils.createSetter(pojoType,
tableInfo.getRowOrIdExpression(), String.class);
- fieldValueGenerator =
FieldValueGenerator.getFieldValueGenerator(pojoType, tableInfo.getFieldsInfo()
);
+ fieldValueGenerator =
HBaseFieldValueGenerator.getHBaseFieldValueGenerator(pojoType,
tableInfo.getFieldsInfo() );
--- End diff --
You could just use the constructor of HBaseFieldValueGenerator... Do you
see any advantage is having another public static entry point for generating
this object?
> Improve HBasePOJOInputOperator with support for threaded read
> -
>
> Key: APEXMALHAR-1957
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1957
> Project: Apache Apex Malhar
> Issue Type: Task
>Reporter: Bhupesh Chawda
>Assignee: Bhupesh Chawda
>
> Add the following support to Hbase POJO Input Operator:
> * Add support for threaded read
> * Allow to specify a set of "column family: column" and fetch data only for
> these columns
> * Allow to specify an end row key to stop scanning
> * Add metrics
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Github user chinmaykolhatkar commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/292#discussion_r66885832
--- Diff:
contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseScanOperator.java ---
@@ -40,27 +51,107 @@
* @tags hbase, scan, input operator
* @since 0.3.2
*/
-public abstract class HBaseScanOperator extends HBaseInputOperator
+public abstract class HBaseScanOperator extends HBaseInputOperator
implements Operator.ActivationListener
{
+ public static final int DEF_HINT_SCAN_LOOKAHEAD = 2;
+ public static final int DEF_QUEUE_SIZE = 1000;
+ public static final int DEF_SLEEP_MILLIS = 10;
+
+ private String startRow;
+ private String endRow;
+ private String lastReadRow;
+ private int hintScanLookahead = DEF_HINT_SCAN_LOOKAHEAD;
+ private int queueSize = DEF_QUEUE_SIZE;
+ private int sleepMillis = DEF_SLEEP_MILLIS;
+ private Queue resultQueue;
+ private boolean threadFailed = false;
+
+ @AutoMetric
+ protected long tuplesRead;
+
+ // Transients
+ protected transient Scan scan;
+ protected transient ResultScanner scanner;
+ protected transient Thread readThread;
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+super.setup(context);
+resultQueue = Queues.newLinkedBlockingQueue(queueSize);
+ }
+
+ @Override
+ public void activate(Context context)
+ {
+startReadThread();
+ }
+
+ protected void startReadThread()
+ {
+try {
+ scan = operationScan();
+ scanner = getStore().getTable().getScanner(scan);
+} catch (IOException e) {
+ throw new RuntimeException(e);
+}
+readThread = new Thread(new Runnable() {
+ @Override
+ public void run()
+ {
+try {
+ Result result;
+ while ((result = scanner.next()) != null) {
+while (!resultQueue.offer(result)) {
+ Thread.sleep(sleepMillis);
+}
+ }
+} catch (Exception e) {
+ logger.debug("Exception in fetching results {}", e.getMessage());
+ threadFailed = true;
+ throw new RuntimeException(e);
+} finally {
+ scanner.close();
+}
+ }
+});
+readThread.start();
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+super.beginWindow(windowId);
+tuplesRead = 0;
+ }
@Override
public void emitTuples()
{
+if (!readThread.isAlive() && threadFailed) {
+ throw new RuntimeException("Exception in scan thread");
--- End diff --
Should we not ideally restart the thread and let the operator continue
running?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/APEXMALHAR-1957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328590#comment-15328590
]
ASF GitHub Bot commented on APEXMALHAR-1957:
Github user chinmaykolhatkar commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/292#discussion_r66886741
--- Diff:
contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java
---
@@ -69,116 +70,105 @@ public Object convertValue( HBaseFieldInfo fieldInfo,
Object value)
public void setup(OperatorContext context)
{
try {
- store.connect();
+ super.setup(context);
pojoType = Class.forName(pojoTypeName);
--- End diff --
Yes please.
> Improve HBasePOJOInputOperator with support for threaded read
> -
>
> Key: APEXMALHAR-1957
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1957
> Project: Apache Apex Malhar
> Issue Type: Task
>Reporter: Bhupesh Chawda
>Assignee: Bhupesh Chawda
>
> Add the following support to Hbase POJO Input Operator:
> * Add support for threaded read
> * Allow to specify a set of "column family: column" and fetch data only for
> these columns
> * Allow to specify an end row key to stop scanning
> * Add metrics
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/APEXMALHAR-1957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328481#comment-15328481
]
ASF GitHub Bot commented on APEXMALHAR-1957:
Github user chinmaykolhatkar commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/292#discussion_r66884332
--- Diff:
contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldValueGenerator.java
---
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.hbase;
+
+import java.util.List;
+
+import com.datatorrent.lib.util.FieldValueGenerator;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * A {@link FieldValueGenerator} implementation for {@link HBaseFieldInfo}
+ */
+public class HBaseFieldValueGenerator extends
FieldValueGenerator
+{
+ public static final String COLON = ":";
+
+ public static HBaseFieldValueGenerator getHBaseFieldValueGenerator(final
Class clazz, List
+ fieldInfos)
+ {
+return new HBaseFieldValueGenerator(clazz, fieldInfos);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected HBaseFieldValueGenerator(final Class clazz,
List fieldInfos)
+ {
+for (HBaseFieldInfo fieldInfo : fieldInfos) {
+ fieldInfoMap.put(fieldInfo.getFamilyName() + COLON +
fieldInfo.getColumnName(), fieldInfo);
+
+ PojoUtils.Getter
Github user chinmaykolhatkar commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/292#discussion_r66886741
--- Diff:
contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java
---
@@ -69,116 +70,105 @@ public Object convertValue( HBaseFieldInfo fieldInfo,
Object value)
public void setup(OperatorContext context)
{
try {
- store.connect();
+ super.setup(context);
pojoType = Class.forName(pojoTypeName);
--- End diff --
Yes please.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/APEXMALHAR-1957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328582#comment-15328582
]
ASF GitHub Bot commented on APEXMALHAR-1957:
Github user chinmaykolhatkar commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/292#discussion_r66886409
--- Diff:
contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseScanOperator.java ---
@@ -79,4 +170,79 @@ public void emitTuples()
*/
protected abstract T getTuple(Result result);
+ /**
+ * Returns the start row key in the table as set previously
+ * @return {@link #startRow}
+ */
+ public String getStartRow()
+ {
+return startRow;
+ }
+
+ /**
+ * Sets the start row key in the table from where the scan should begin
+ * @param startRow
+ */
+ public void setStartRow(String startRow)
+ {
+this.startRow = startRow;
+ }
+
+ /**
+ * Returns the end row key in the table as set previously
+ * @return {@link #endRow}
+ */
+ public String getEndRow()
+ {
+return endRow;
+ }
+
+ /**
+ * Sets the end row key in the table where the scan should end
+ * @param endRow
+ */
+ public void setEndRow(String endRow)
+ {
+this.endRow = endRow;
+ }
+
+ /**
+ * Returns the last read row key from the hbase table
+ * @return {@link #lastReadRow}
+ */
+ public String getLastReadRow()
+ {
+return lastReadRow;
+ }
+
+ /**
+ * Sets the last read row key from the hbase table. After the failures,
the new scan will start from this row key
+ * @param lastReadRow
+ */
+ public void setLastReadRow(String lastReadRow)
+ {
+this.lastReadRow = lastReadRow;
+ }
+
+ /**
+ * Returns the Scan HINT_LOOKAHEAD parameter as configured. Default is
{@value #DEF_HINT_SCAN_LOOKAHEAD}
--- End diff --
Can you please check once that HINT_LOOKAHEAD feature is available in all
HBase client and server versions?
If not, can you please describe this operator as works best for certain
version and above for all other versions this field will be ignored?
> Improve HBasePOJOInputOperator with support for threaded read
> -
>
> Key: APEXMALHAR-1957
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1957
> Project: Apache Apex Malhar
> Issue Type: Task
>Reporter: Bhupesh Chawda
>Assignee: Bhupesh Chawda
>
> Add the following support to Hbase POJO Input Operator:
> * Add support for threaded read
> * Allow to specify a set of "column family: column" and fetch data only for
> these columns
> * Allow to specify an end row key to stop scanning
> * Add metrics
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Github user chinmaykolhatkar commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/292#discussion_r66884310
--- Diff:
contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldValueGenerator.java
---
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.hbase;
+
+import java.util.List;
+
+import com.datatorrent.lib.util.FieldValueGenerator;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * A {@link FieldValueGenerator} implementation for {@link HBaseFieldInfo}
+ */
+public class HBaseFieldValueGenerator extends
FieldValueGenerator
+{
+ public static final String COLON = ":";
+
+ public static HBaseFieldValueGenerator getHBaseFieldValueGenerator(final
Class clazz, List
+ fieldInfos)
+ {
+return new HBaseFieldValueGenerator(clazz, fieldInfos);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected HBaseFieldValueGenerator(final Class clazz,
List fieldInfos)
+ {
+for (HBaseFieldInfo fieldInfo : fieldInfos) {
+ fieldInfoMap.put(fieldInfo.getFamilyName() + COLON +
fieldInfo.getColumnName(), fieldInfo);
+
+ PojoUtils.Getter getter =
+ PojoUtils.createGetter(clazz,
fieldInfo.getPojoFieldExpression(), fieldInfo.getType().getJavaType());
+ fieldGetterMap.put(fieldInfo, getter);
--- End diff --
Would it be more efficient if the key of fieldGetterMap is made as
familyName:columnName instead of any object?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
Github user chinmaykolhatkar commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/302#discussion_r66880779
--- Diff:
library/src/main/java/com/datatorrent/lib/math/AbstractOutput.java ---
@@ -38,23 +38,23 @@
* Double type output.
*/
@OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort doubleResult = new
DefaultOutputPort();
+ public transient DefaultOutputPort doubleResult = new
DefaultOutputPort();
--- End diff --
@ilganeli This won't work. The purpose of keeping port variable final is
because this variable is used in Application.java to connect the ports. Once
its used in addStream in application.java, the reference to this variable is
kept and the same object will be used. Hence even if the object is recreated in
child class, that won't take effect.
Suggested approach for adding unifier to super class's port is as follows:
1. Do following in parent class:
``
public final transient DefaultOutputPort doubleResult = new
DefaultOutputPort<>()
{
@Override
public Operator.Unifier getUnifier()
{
return getUnifier();
}
}
protected Operator.Unifier getUnifier()
{
return null;
}
``
2. Do following in child class:
``
@Override
protected Operator.Unifier getUnifier()
{
return new UnifierSumNumber<>();
}
``
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/APEXMALHAR-139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328436#comment-15328436
]
ASF GitHub Bot commented on APEXMALHAR-139:
---
Github user chinmaykolhatkar commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/302#discussion_r66880779
--- Diff:
library/src/main/java/com/datatorrent/lib/math/AbstractOutput.java ---
@@ -38,23 +38,23 @@
* Double type output.
*/
@OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort doubleResult = new
DefaultOutputPort();
+ public transient DefaultOutputPort doubleResult = new
DefaultOutputPort();
--- End diff --
@ilganeli This won't work. The purpose of keeping port variable final is
because this variable is used in Application.java to connect the ports. Once
its used in addStream in application.java, the reference to this variable is
kept and the same object will be used. Hence even if the object is recreated in
child class, that won't take effect.
Suggested approach for adding unifier to super class's port is as follows:
1. Do following in parent class:
``
public final transient DefaultOutputPort doubleResult = new
DefaultOutputPort<>()
{
@Override
public Operator.Unifier getUnifier()
{
return getUnifier();
}
}
protected Operator.Unifier getUnifier()
{
return null;
}
``
2. Do following in child class:
``
@Override
protected Operator.Unifier getUnifier()
{
return new UnifierSumNumber<>();
}
``
> Make sigma operator partition-able
> --
>
> Key: APEXMALHAR-139
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-139
> Project: Apache Apex Malhar
> Issue Type: Task
>Reporter: Chandni Singh
>
> Sigma operator in Malhar library doesn't seem to be Stateful as stated in the
> java-docs.
> This operator can be partitioned by adding a Unifier to the output ports.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
GitHub user chinmaykolhatkar opened a pull request:
https://github.com/apache/apex-malhar/pull/317
APEXMALHAR-2117 Generate default queryStmt even if its set to empty string
Also added missing hsqldb mavne dependency for test scope.
@sandeepdeshmukh Can you please review and merge?
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/chinmaykolhatkar/apex-malhar APEXMALHAR-2117
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/apex-malhar/pull/317.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #317
commit 882e4568a0176723cdc31de4eef021f6376b2c6c
Author: Chinmay Kolhatkar
Date: 2016-06-13T22:02:01Z
APEXMALHAR-2117 Generate default queryStmt even if its set to empty string.
Also added missing hsqldb mavne dependency for test scope.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
Chinmay Kolhatkar created APEXMALHAR-2117:
-
Summary: JDBCLoader for Enricher should generate default queryStmt
even when value is empty string
Key: APEXMALHAR-2117
URL: https://issues.apache.org/jira/browse/APEXMALHAR-2117
Project: Apache Apex Malhar
Issue Type: Bug
Components: utilities
Affects Versions: 3.4.0
Reporter: Chinmay Kolhatkar
Assignee: Chinmay Kolhatkar
Currently JDBCLoader creates a default queryStmt based on properties set only
when the queryStmt is set to null.
It should do that same when its set to empty string.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/APEXMALHAR-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chinmay Kolhatkar resolved APEXMALHAR-2111.
---
Resolution: Done
Fix Version/s: 3.5.0
> Projection Operator config params shall use List instead of comma-separated
> field names
> ---
>
> Key: APEXMALHAR-2111
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2111
> Project: Apache Apex Malhar
> Issue Type: Improvement
>Reporter: Pradeep A. Dalvi
>Assignee: Pradeep A. Dalvi
> Fix For: 3.5.0
>
>
> Projection Operator accepts 2 config params: selectFields & dropFields.
> Currently both these parameters accept values in comma-separated field names
> format. However this is not inline with other operators do to accept multiple
> values, for which they use List instead.
> So this proposal is to make change in Projection Operator to accept config
> params as List instead of comma-separated string. Hence selectFields &
> dropFields shall accept List going forward.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/APEXMALHAR-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327897#comment-15327897
]
ASF GitHub Bot commented on APEXMALHAR-2111:
Github user asfgit closed the pull request at:
https://github.com/apache/apex-malhar/pull/311
> Projection Operator config params shall use List instead of comma-separated
> field names
> ---
>
> Key: APEXMALHAR-2111
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2111
> Project: Apache Apex Malhar
> Issue Type: Improvement
>Reporter: Pradeep A. Dalvi
>Assignee: Pradeep A. Dalvi
>
> Projection Operator accepts 2 config params: selectFields & dropFields.
> Currently both these parameters accept values in comma-separated field names
> format. However this is not inline with other operators do to accept multiple
> values, for which they use List instead.
> So this proposal is to make change in Projection Operator to accept config
> params as List instead of comma-separated string. Hence selectFields &
> dropFields shall accept List going forward.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Github user asfgit closed the pull request at:
https://github.com/apache/apex-malhar/pull/311
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
Hi All,
I am proposing to add new API in 0.8 version of
AbstractKafkaInputOperator:
void emitMessage(KafkaMessage message).
"message" has details like offset, kafkapartition, value of the message.
By adding this, users have more control over the message. Callback of this
method would be in emitTuples() API.
To maintain backward compatibility, definition of this new method as
below:
void emitMessage(KafkaMessage message)
{
emitTuple(message.msg);
}
Please share your thoughts on this approach.
Regards,
Chaitanya