[GitHub] incubator-apex-malhar pull request #300: APEXMALHAR-2103 Fixed the scanner i...

2016-06-01 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/300#discussion_r65475558
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java ---
@@ -375,11 +375,14 @@ public void run()
 lastScannedInfo = null;
 numDiscoveredPerIteration = 0;
 for (String afile : files) {
-  String filePath = new File(afile).getAbsolutePath();
+  Path filePath = new Path(afile);
   LOG.debug("Scan started for input {}", filePath);
-  Map lastModifiedTimesForInputDir;
-  lastModifiedTimesForInputDir = referenceTimes.get(filePath);
-  scan(new Path(afile), null, lastModifiedTimesForInputDir);
+  Map lastModifiedTimesForInputDir = null;
+  if (fs.exists(filePath)) {
+FileStatus fileStatus = fs.getFileStatus(filePath);
+lastModifiedTimesForInputDir = 
referenceTimes.get(fileStatus.getPath().toUri().getPath());
--- End diff --

No. In createScannedFileInfo(), the directory/file path specified as 
absolute path. But, In case of local file system, files might be relative path. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2106) Support merging multiple streams with StreamMerger

2016-06-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311597#comment-15311597
 ] 

ASF GitHub Bot commented on APEXMALHAR-2106:


Github user ilganeli commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/307#discussion_r65474494
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java ---
@@ -41,35 +47,57 @@
 @Stateless
 public class StreamMerger extends BaseOperator
 {
-  /**
-   * Data input port 1.
-   */
-  public final transient DefaultInputPort data1 = new 
DefaultInputPort()
+  private int portCount = 0;
+  private final transient ArrayList ports = new 
ArrayList<>(portCount);
--- End diff --

A great question, was in the process of doing so. Short answer is it fails 
wonderfully:
```
java.lang.IllegalArgumentException: Port is not associated to any operator 
in the DAG: com.datatorrent.lib.stream.StreamMerger$1@3b084709
at 
com.datatorrent.stram.plan.logical.LogicalPlan.assertGetPortMeta(LogicalPlan.java:1501)
at 
com.datatorrent.stram.plan.logical.LogicalPlan.access$1000(LogicalPlan.java:119)
```

The relevant question is thus is there any way to define ports in this way, 
e.g. an array of ports, such that the DAG is aware of the ports?


> Support merging multiple streams with StreamMerger 
> ---
>
> Key: APEXMALHAR-2106
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2106
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Ilya Ganelin
>Assignee: Ilya Ganelin
>
> To properly implement the Flatten transformation (and other Stream 
> combination operations), Apex must support merging data from multiple 
> sources. The StreamMerger operator can be improved to merge multiple streams, 
> rather than just the two streams it can handle in the present implementation. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2106) Support merging multiple streams with StreamMerger

2016-06-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311572#comment-15311572
 ] 

ASF GitHub Bot commented on APEXMALHAR-2106:


Github user gauravgopi123 commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/307#discussion_r65472797
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java ---
@@ -41,35 +47,57 @@
 @Stateless
 public class StreamMerger extends BaseOperator
 {
-  /**
-   * Data input port 1.
-   */
-  public final transient DefaultInputPort data1 = new 
DefaultInputPort()
+  private int portCount = 0;
+  private final transient ArrayList ports = new 
ArrayList<>(portCount);
--- End diff --

Did you use this operator in a DAG and see if it works?


> Support merging multiple streams with StreamMerger 
> ---
>
> Key: APEXMALHAR-2106
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2106
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Ilya Ganelin
>Assignee: Ilya Ganelin
>
> To properly implement the Flatten transformation (and other Stream 
> combination operations), Apex must support merging data from multiple 
> sources. The StreamMerger operator can be improved to merge multiple streams, 
> rather than just the two streams it can handle in the present implementation. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (APEXMALHAR-2099) Identify overlap between Beam API and existing Apex Stream API

2016-06-01 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311571#comment-15311571
 ] 

Ilya Ganelin edited comment on APEXMALHAR-2099 at 6/2/16 1:45 AM:
--

[~siyuan][~thw] Thanks for the comments. I've been following along all the 
linked work - Thomas, I've also looked into your Runner in detail. Based on the 
discussion here, the work in 
https://github.com/apache/incubator-beam/compare/master...tweise:master and in 
https://issues.apache.org/jira/browse/APEXMALHAR-2085, this is a fantastic 
start! Siyuan - it sounds like there's a clear agreement that we want to break 
down the transformations into windowed and un-windowed transforms, I think the 
next logical step is to build the Windowed Operator. This would allow us to 
implement a number of the transforms already present in the Runner as windowed 
transformations and subsequently implement trigger semantics. I'm obviously not 
down on the ground with you guys, so I don't know in detail who is working on 
what. If possible, I would propose that we work together to formulate a set of 
concrete JIRAs that lay out the next steps so that we are not replicating 
effort. It might even make sense to do this in person. Then, we can split up 
the labor and move forward in tackling specific problems.



was (Author: ilganeli):
[~siyuan][~thw] Thanks for the comments. I've been following along all the 
linked work - Thomas, I've also looked into your Runner in detail. Based on the 
discussion here, the work in 
https://github.com/apache/incubator-beam/compare/master...tweise:master and in 
https://issues.apache.org/jira/browse/APEXMALHAR-2085. This is a fantastic 
start! Siyuan - it sounds like there's a clear agreement that we want to break 
down the transformations into windowed and un-windowed transforms, I think the 
next logical step is to build the Windowed Operator. This would allow us to 
implement a number of the transforms already present in the Runner as windowed 
transformations and subsequently implement trigger semantics. I'm obviously not 
down on the ground with you guys, so I don't know in detail who is working on 
what. If possible, I would propose that we work together to formulate a set of 
concrete JIRAs that lay out the next steps so that we are not replicating 
effort. It might even make sense to do this in person. Then, we can split up 
the labor and move forward in tackling specific problems.


> Identify overlap between Beam API and existing Apex Stream API
> --
>
> Key: APEXMALHAR-2099
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2099
> Project: Apache Apex Malhar
>  Issue Type: Sub-task
>Reporter: Ilya Ganelin
>
> There should be some overlap between the Beam API and the recently released 
> Apex Stream API. This task captures the need to understand and document this 
> overlap.
> AC:
> * A document or JIRA comment identifying which components of the Beam API are 
> implement, similar, or absent within the Apex Stream API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-apex-malhar pull request #307: [APEXMALHAR-2106] Support multiple ...

2016-06-01 Thread gauravgopi123
Github user gauravgopi123 commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/307#discussion_r65472797
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java ---
@@ -41,35 +47,57 @@
 @Stateless
 public class StreamMerger extends BaseOperator
 {
-  /**
-   * Data input port 1.
-   */
-  public final transient DefaultInputPort data1 = new 
DefaultInputPort()
+  private int portCount = 0;
+  private final transient ArrayList ports = new 
ArrayList<>(portCount);
--- End diff --

Did you use this operator in a DAG and see if it works?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (APEXMALHAR-2085) Implement Windowed Operators

2016-06-01 Thread David Yan (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311123#comment-15311123
 ] 

David Yan edited comment on APEXMALHAR-2085 at 6/1/16 9:43 PM:
---

Based on my understanding, this looks like what needs to be done in a very high 
level.
Assuming T is the type of the tuples:

1. Watermark generator operator that takes T as the input and generate 
TimeStampedValue tuples with watermark tuples. The watermark generator takes 
the following as configuration.
- The function to get the timestamp, equivalent of lambda T -> milliseconds 
from epoch
- watermark type (perfect, heuristic, etc). I need a little more research on 
how watermark is actually generated

2. A modified DimensionOperator. This has two stages:

* Stage 1: Window generator that takes TimestampedValue as the input and 
generate WindowedValue (WindowedValue is an abstract class from Beam, which 
has the window information for the tuple).
   ** The WindowFn object to assign the window(s) for each tuple
   ** Possibility of merging windows
* Stage 2: The actual pane generation and takes WindowedValue as the input 
and WindowedValue as the output. The output includes any retraction values 
(as a result from lateness and window merging)

This operator takes the following as configuration:
  - The Windowing Function (type WindowFn): Describes the windowing (global? 
fixed? session? session gap duration?)
  - Accumulation mode (type Enum): Accumulating, Discarding or Accumulating & 
Retracting
  - Allowed lateness (type Duration): For dropping late tuples and purging old 
state (in conjunction of committed checkpoint)
  - The Aggregation (type lambda Iterable -> R): How we want to aggregate 
the tuple data. 
  - Triggering (type Trigger): When we actually output the result to the output 
port

The DimensionOperator will need to implement the above features.
Also, the DimensionOperator will need to see whether T is actually a KV 
(instanceof) and if so, the tuples are aggregated by window AND key.

This is very preliminary and it's possible that I'm going down the wrong path. 
So please provide your feedback.


was (Author: davidyan):
Based on my understanding, this looks like what needs to be done in a very high 
level.
Assuming T is the type of the tuples:

1. Watermark generator operator that takes T as the input and generate 
TimeStampedValue tuples with watermark tuples. The watermark generator takes 
the following as configuration.
- The function to get the timestamp, equivalent of lambda T -> milliseconds 
from epoch
- watermark type (perfect, heuristic, etc). I need a little more research on 
how watermark is actually generated

2. A modified DimensionOperator. This has two stages:

* Stage 1: Window generator that takes TimestampedValue as the input and 
generate WindowedValue (WindowedValue is an abstract class from Beam, which 
has the window information for the tuple).
   ** The WindowFn object to assign the window(s) for each tuple
   ** Possibility of merging windows
* Stage 2: The actual pane generation and takes WindowedValue as the input 
and WindowedValue as the output. The output includes any retraction values 
(as a result from lateness and window merging)

This operator takes the following as configuration:
  - The Windowing Function (type WindowFn): Describes how the windowing
  - Accumulation mode (type Enum): Accumulating, Discarding or Accumulating & 
Retracting
  - Allowed lateness (type Duration): For dropping late tuples and purging old 
state (in conjunction of committed checkpoint)
  - The Aggregation (type lambda Iterable -> R): How we want to aggregate 
the tuple data. 
  - Triggering (type Trigger): When we actually output the result to the output 
port

The DimensionOperator will need to implement the above features.
Also, the DimensionOperator will need to see whether T is actually a KV 
(instanceof) and if so, the tuples are aggregated by window AND key.

This is very preliminary and it's possible that I'm going down the wrong path. 
So please provide your feedback.

> Implement Windowed Operators
> 
>
> Key: APEXMALHAR-2085
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2085
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Siyuan Hua
>Assignee: Siyuan Hua
>
> As per our recent several discussions in the community. A group of Windowed 
> Operators that delivers the window semantic follows the google Data Flow 
> model(https://cloud.google.com/dataflow/) is very important. 
> The operators should be designed and implemented in a way for 
> High-level API
> Beam translation
> Easy to use with other popular operator
> {panel:title=Operator Hierarchy}
> Hierarchy of the operators,
> The windowed operators should cover all possible transformations that 

[jira] [Comment Edited] (APEXMALHAR-2085) Implement Windowed Operators

2016-06-01 Thread David Yan (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311123#comment-15311123
 ] 

David Yan edited comment on APEXMALHAR-2085 at 6/1/16 9:13 PM:
---

Based on my understanding, this looks like what needs to be done in a very high 
level.
Assuming T is the type of the tuples:

1. Watermark generator operator that takes T as the input and generate 
TimeStampedValue tuples with watermark tuples. The watermark generator takes 
the following as configuration.
- The function to get the timestamp, equivalent of lambda T -> milliseconds 
from epoch
- watermark type (perfect, heuristic, etc). I need a little more research on 
how watermark is actually generated

2. A modified DimensionOperator. This has two stages:

* Stage 1: Window generator that takes TimestampedValue as the input and 
generate WindowedValue (WindowedValue is an abstract class from Beam, which 
has the window information for the tuple).
   ** The WindowFn object to assign the window(s) for each tuple
   ** Possibility of merging windows
* Stage 2: The actual pane generation and takes WindowedValue as the input 
and WindowedValue as the output. The output includes any retraction values 
(as a result from lateness and window merging)

This operator takes the following as configuration:
  - Accumulation mode (type Enum): Accumulating, Discarding or Accumulating & 
Retracting
  - Allowed lateness (type Duration): For dropping late tuples and purging old 
state (in conjunction of committed checkpoint)
  - The Aggregation (type lambda Iterable -> R): How we want to aggregate 
the tuple data. 
  - Triggering (type Trigger): When we actually output the result to the output 
port
The DimensionOperator will need to implement the above features.
Also, the DimensionOperator will need to see whether T is actually a KV 
(instanceof) and if so, the tuples are aggregated by window AND key.

This is very preliminary and it's possible that I'm going down the wrong path. 
So please provide your feedback.


was (Author: davidyan):
Based on my understanding, this looks like what needs to be done in a very high 
level.
Assuming T is the type of the tuples:

1. Watermark generator operator that takes T as the input and generate 
TimeStampedValue tuples with watermark tuples. The watermark generator takes 
the following as configuration.
- The function to get the timestamp, equivalent of lambda T -> milliseconds 
from epoch
- watermark type (perfect, heuristic, etc). I need a little more research on 
how watermark is actually generated

2. A modified DimensionOperator. This has two stages:

* Stage 1: Window generator that takes TimestampedValue as the input and 
generate WindowedValue (WindowedValue is an abstract class from Beam, which 
has the window information for the tuple).
   ** The WindowFn object to assign the window(s) for each tuple
   ** Possibility of merging windows
* Stage 2: The actual pane generation and takes WindowedValue as the input 
and WindowedValue as the output. The output includes any retraction values.

This operator takes the following as configuration:
  - Accumulation mode (type Enum): Accumulating, Discarding or Accumulating & 
Retracting
  - Allowed lateness (type Duration): For dropping late tuples and purging old 
state (in conjunction of committed checkpoint)
  - The Aggregation (type lambda Iterable -> R): How we want to aggregate 
the tuple data. 
  - Triggering (type Trigger): When we actually output the result to the output 
port
The DimensionOperator will need to implement the above features.
Also, the DimensionOperator will need to see whether T is actually a KV 
(instanceof) and if so, the tuples are aggregated by window AND key.

This is very preliminary and it's possible that I'm going down the wrong path. 
So please provide your feedback.

> Implement Windowed Operators
> 
>
> Key: APEXMALHAR-2085
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2085
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Siyuan Hua
>Assignee: Siyuan Hua
>
> As per our recent several discussions in the community. A group of Windowed 
> Operators that delivers the window semantic follows the google Data Flow 
> model(https://cloud.google.com/dataflow/) is very important. 
> The operators should be designed and implemented in a way for 
> High-level API
> Beam translation
> Easy to use with other popular operator
> {panel:title=Operator Hierarchy}
> Hierarchy of the operators,
> The windowed operators should cover all possible transformations that require 
> window, and batch processing is also considered as special window called 
> global window
> {code}
>+---+
>+-> |  WindowedOperator | <+
>|   

[jira] [Commented] (APEXMALHAR-2094) Quantiles sketch operator

2016-06-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311082#comment-15311082
 ] 

ASF GitHub Bot commented on APEXMALHAR-2094:


Github user sandeep-n commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/301#discussion_r65440216
  
--- Diff: 
sketches/src/main/java/org/apache/apex/malhar/sketches/QuantilesEstimator.java 
---
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sketches;
+
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * An implementation of BaseOperator that computes a "sketch" (a 
representation of the probability distribution using
+ * a low memory footprint) of the incoming numeric data, and 
evaluates/outputs the cumulative distribution function and
+ * quantiles of the probability distribution. Leverages the quantiles 
sketch implementation from the Yahoo Datasketches
+ * Library.
+ * 
+ * Input Port(s) : 
+ * data :  Data values input port. 
+ * 
+ * Output Port(s) :  
+ * cdfOutput : cumulative distribution function output port. 
+ * quantilesOutput : quantiles output port. 
+ * 
+ * Partitions : No, no will yield wrong results. 
+ * +
+ */
+@OperatorAnnotation(partitionable = false)
+public class QuantilesEstimator extends BaseOperator
+{
+  /**
+   * Output port that emits cdf estimated at the current data point
+   */
+  public final transient DefaultOutputPort cdfOutput = new 
DefaultOutputPort<>();
+  /**
+   * Emits quantiles of stream seen thus far
+   */
+  public final transient DefaultOutputPort quantilesOutput = new 
DefaultOutputPort<>();
+  /**
+   * Emits probability masses on specified intervals
+   */
+  public final transient DefaultOutputPort pmfOutput = new 
DefaultOutputPort<>();
+  private transient QuantilesSketch quantilesSketch = 
QuantilesSketch.builder().build();
+  /**
+   * This field determines the specific quantiles to be calculated.
+   * Default is set to compute the standard quartiles.
+   */
+  private double[] fractions = {0.0, 0.25, 0.50, 0.75, 1.00};
+  /**
+   * This field determines the intervals on which the probability mass 
function is computed.
+   */
+  private double[] pmfIntervals = {};
+  /**
+   * This operator computes three different quantities which are output on 
separate output ports. If not using any of
+   * these quantities, these variables can be set to avoid unnecessary 
computation.
+   */
+  private boolean computeCdf = true;
+  private boolean computeQuantiles = true;
+  private boolean computePmf = true;
+  public final transient DefaultInputPort data = new 
DefaultInputPort()
+  {
+@Override
+public void process(Double input)
+{
+
+  quantilesSketch.update(input);
+
+  if (computeQuantiles) {
+/**
+ * Computes and emits quantiles of the stream seen thus far
+ */
+quantilesOutput.emit(quantilesSketch.getQuantiles(fractions));
+  }
+
+  if (computeCdf) {
+/**
+ * Emits (estimate of the) cumulative distribution function 
evaluated at the input value, according to the
+ * sketched probability distribution of the stream seen thus far.
+ */
+cdfOutput.emit(quantilesSketch.getCDF(new double[]{input})[0]);
+  }
+
+  if (computePmf) {
+pmfOutput.emit(quantilesSketch.getPMF(pmfIntervals));
+  }
+
+}
+  };
+
+  /**
+   * Constructor that allows non-default 

[jira] [Commented] (APEXMALHAR-2099) Identify overlap between Beam API and existing Apex Stream API

2016-06-01 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310987#comment-15310987
 ] 

Ilya Ganelin commented on APEXMALHAR-2099:
--

The current implementation of the Apex Stream API (ApexStreamImpl.java) 
supports the following functions:

- map
- flatMap
- filter
- reduce
- fold

On the Beam side, there is not a strict "API" as far as transformation goes. 
Instead, Beam defines a PTransform class which implements an "apply" function 
that applies a given function to incoming data represented as a PCollection.  
There are presently on the order of 40 different transformations implemented 
for Beam: 
https://github.com/apache/incubator-beam/tree/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms

The analogs to the Stream API are: 
Apex => Beam
map => ParDo
flatMap => FlatMapElements
filter => Filter
reduce => Combine (sort of)

In General, beam presently supports a much greater variety of transformations. 
They also support different classes of transformation. For example, some 
transformations are applied over a window, while others are applied on a 
per-tuple basis. The windowing behavior can be explicitly specified by defining 
a windowing strategy. Key limitations of the current Apex Stream API are that 
it does not have any support for cross-stream interaction. Specifically, 
operations like groupByKey or join are not currently defined within the scope 
of the Apex Stream API and this is a serious limitation since it limits the 
applications that can be built. 



> Identify overlap between Beam API and existing Apex Stream API
> --
>
> Key: APEXMALHAR-2099
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2099
> Project: Apache Apex Malhar
>  Issue Type: Sub-task
>Reporter: Ilya Ganelin
>
> There should be some overlap between the Beam API and the recently released 
> Apex Stream API. This task captures the need to understand and document this 
> overlap.
> AC:
> * A document or JIRA comment identifying which components of the Beam API are 
> implement, similar, or absent within the Apex Stream API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXCORE-466) Improve logging from the *Agent.java files

2016-06-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310904#comment-15310904
 ] 

ASF GitHub Bot commented on APEXCORE-466:
-

Github user ilganeli commented on the issue:

https://github.com/apache/incubator-apex-core/pull/339
  
LGTM


> Improve logging from the *Agent.java files
> --
>
> Key: APEXCORE-466
> URL: https://issues.apache.org/jira/browse/APEXCORE-466
> Project: Apache Apex Core
>  Issue Type: Improvement
>Reporter: David Yan
>Assignee: David Yan
>
> Currently we are getting stack traces logging that actually do not 
> necessarily indicate an error.  For example: 
> {code}
> 2016-05-20 11:56:22,859 WARN com.datatorrent.stram.client.EventsAgent: Got 
> exception when reading events
> java.io.FileNotFoundException: File does not exist: 
> /user/david/datatorrent/apps/application_1462948052533_0204/events/index.txt
> at 
> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
> at 
> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
> at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1932)
> at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1873)
> at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1853)
> at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1825)
> at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:559)
> at 
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.j
> ava:87)
> at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorP
> B.java:363)
> at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.
> java)
> at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:415)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038)
> {code}
> This stack trace only indicates that no events have been logged yet for the 
> application.
> We need to reduce this kind of logging to prevent false alarms to the user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-apex-core issue #339: APEXCORE-466 #resolve improved logging from ...

2016-06-01 Thread ilganeli
Github user ilganeli commented on the issue:

https://github.com/apache/incubator-apex-core/pull/339
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXCORE-463) get-app-package-operators in ApexCLI not listing certain modules

2016-06-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310893#comment-15310893
 ] 

ASF GitHub Bot commented on APEXCORE-463:
-

Github user siyuanh commented on a diff in the pull request:

https://github.com/apache/incubator-apex-core/pull/338#discussion_r65421945
  
--- Diff: engine/src/main/java/com/datatorrent/stram/webapp/TypeGraph.java 
---
@@ -355,7 +356,8 @@ public int size()
 }
 Set result = new TreeSet<>();
 for (TypeGraphVertex node : tgv.allInstantiableDescendants) {
-  if ((isAncestor(InputOperator.class.getName(), node.typeName) || 
!getAllInputPorts(node).isEmpty())) {
+  if ((isAncestor(InputOperator.class.getName(), node.typeName) || 
isAncestor(Module.class.getName(), node.typeName)
--- End diff --

 Method should do what the name suggest . Should either change the name of 
the method or have this in a separate method. There are same problem in 
TypeGraph and OperatorDiscover, please fix them as well. Otherwise, it would be 
confusing and error-prone in the future.


> get-app-package-operators in ApexCLI  not listing certain modules
> -
>
> Key: APEXCORE-463
> URL: https://issues.apache.org/jira/browse/APEXCORE-463
> Project: Apache Apex Core
>  Issue Type: Bug
>Reporter: shubham pathak
>Assignee: shubham pathak
>
> FSInputModule is not being shown in output of  get-app-package-operators



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-apex-core pull request #338: APEXCORE-463 : fixed condition to all...

2016-06-01 Thread siyuanh
Github user siyuanh commented on a diff in the pull request:

https://github.com/apache/incubator-apex-core/pull/338#discussion_r65421945
  
--- Diff: engine/src/main/java/com/datatorrent/stram/webapp/TypeGraph.java 
---
@@ -355,7 +356,8 @@ public int size()
 }
 Set result = new TreeSet<>();
 for (TypeGraphVertex node : tgv.allInstantiableDescendants) {
-  if ((isAncestor(InputOperator.class.getName(), node.typeName) || 
!getAllInputPorts(node).isEmpty())) {
+  if ((isAncestor(InputOperator.class.getName(), node.typeName) || 
isAncestor(Module.class.getName(), node.typeName)
--- End diff --

 Method should do what the name suggest . Should either change the name of 
the method or have this in a separate method. There are same problem in 
TypeGraph and OperatorDiscover, please fix them as well. Otherwise, it would be 
confusing and error-prone in the future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2103) scanner issues in FileSplitterInput class

2016-06-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310852#comment-15310852
 ] 

ASF GitHub Bot commented on APEXMALHAR-2103:


Github user DT-Priyanka commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/300#discussion_r65418317
  
--- Diff: 
library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java ---
@@ -155,6 +155,38 @@ public void testFileMetadata() throws 
InterruptedException
   }
 
   @Test
+  public void testFileMetadataForSingleFile() throws InterruptedException
--- End diff --

there is already a test with name "testSingleFile" can we rename this to 
something like, "testScannerFilterForDuplicates"


> scanner issues in FileSplitterInput class
> -
>
> Key: APEXMALHAR-2103
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2103
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Chaitanya
>Assignee: Chaitanya
>
> Issue: FileSplitter continuously emitting filemetadata even though there is  
> a single file.
> Observation: For the same file, While updating and accessing the 
> referenceTimes map in FIleSplitterInput and TimeBasedScanner, the Keys are 
> different. Because of this, the oldestTimeModification is always null in 
> TimeBasedScanner.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-apex-malhar pull request #300: APEXMALHAR-2103 Fixed the scanner i...

2016-06-01 Thread DT-Priyanka
Github user DT-Priyanka commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/300#discussion_r65418498
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java ---
@@ -375,11 +375,14 @@ public void run()
 lastScannedInfo = null;
 numDiscoveredPerIteration = 0;
 for (String afile : files) {
-  String filePath = new File(afile).getAbsolutePath();
+  Path filePath = new Path(afile);
   LOG.debug("Scan started for input {}", filePath);
-  Map lastModifiedTimesForInputDir;
-  lastModifiedTimesForInputDir = referenceTimes.get(filePath);
-  scan(new Path(afile), null, lastModifiedTimesForInputDir);
+  Map lastModifiedTimesForInputDir = null;
+  if (fs.exists(filePath)) {
+FileStatus fileStatus = fs.getFileStatus(filePath);
+lastModifiedTimesForInputDir = 
referenceTimes.get(fileStatus.getPath().toUri().getPath());
--- End diff --

can you use "filePath" instead of fileStatus.getPath().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-apex-malhar pull request #300: APEXMALHAR-2103 Fixed the scanner i...

2016-06-01 Thread DT-Priyanka
Github user DT-Priyanka commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/300#discussion_r65418317
  
--- Diff: 
library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java ---
@@ -155,6 +155,38 @@ public void testFileMetadata() throws 
InterruptedException
   }
 
   @Test
+  public void testFileMetadataForSingleFile() throws InterruptedException
--- End diff --

there is already a test with name "testSingleFile" can we rename this to 
something like, "testScannerFilterForDuplicates"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-1978) Replace ${groupId} with ${project.groupId} in modules and project pom

2016-06-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310847#comment-15310847
 ] 

ASF GitHub Bot commented on APEXMALHAR-1978:


GitHub user ilganeli opened a pull request:

https://github.com/apache/incubator-apex-core/pull/348

[APEXMALHAR-1978] Replaced usages of {groupId} with {project.groupId}

* There were no instances of this within the Malhar project.
* Fixed usages within Core

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ilganeli/incubator-apex-core APEXMALHAR-1978

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-apex-core/pull/348.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #348


commit 5f8d3e6d37f5353a70922a998089d862846e1546
Author: Ilya Ganelin 
Date:   2016-06-01T18:31:22Z

Replaced usages of {groupId} with {project.groupId}

removed style change.




> Replace ${groupId} with ${project.groupId} in modules and project pom
> -
>
> Key: APEXMALHAR-1978
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1978
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Vlad Rozov
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-apex-core pull request #348: [APEXMALHAR-1978] Replaced usages of ...

2016-06-01 Thread ilganeli
GitHub user ilganeli opened a pull request:

https://github.com/apache/incubator-apex-core/pull/348

[APEXMALHAR-1978] Replaced usages of {groupId} with {project.groupId}

* There were no instances of this within the Malhar project.
* Fixed usages within Core

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ilganeli/incubator-apex-core APEXMALHAR-1978

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-apex-core/pull/348.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #348


commit 5f8d3e6d37f5353a70922a998089d862846e1546
Author: Ilya Ganelin 
Date:   2016-06-01T18:31:22Z

Replaced usages of {groupId} with {project.groupId}

removed style change.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (APEXMALHAR-537) Need a Session computation operator which works off of HTTP log

2016-06-01 Thread Ashwin Chandra Putta (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashwin Chandra Putta closed APEXMALHAR-537.
---
Resolution: Invalid

> Need a Session computation operator which works off of HTTP log
> ---
>
> Key: APEXMALHAR-537
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-537
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Chetan Narsude
>Assignee: Ashwin Chandra Putta
>  Labels: feature
>
> Session operator has various attributes - the average, median, max duration 
> of the session. The number of sessions per user, total number of sessions etc.
> Develop an operator which takes in the HTTP log stream and annotates the log 
> entries with the session they belong to.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (APEXMALHAR-281) [logstream] add alerts

2016-06-01 Thread Ashwin Chandra Putta (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashwin Chandra Putta closed APEXMALHAR-281.
---
Resolution: Invalid

> [logstream] add alerts
> --
>
> Key: APEXMALHAR-281
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-281
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Ashwin Chandra Putta
>Assignee: Ashwin Chandra Putta
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (APEXMALHAR-1150) create operator that can convert JSON string to object and vs versa

2016-06-01 Thread Ashwin Chandra Putta (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-1150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashwin Chandra Putta closed APEXMALHAR-1150.

Resolution: Won't Fix

> create operator that can convert JSON string to object and vs versa
> ---
>
> Key: APEXMALHAR-1150
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1150
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Ashwin Chandra Putta
>Assignee: Ashwin Chandra Putta
>
> Create an operator that can 
> 1. convert JSON byte[] or string into an object
> 2. serialize object as JSON



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-1150) create operator that can convert JSON string to object and vs versa

2016-06-01 Thread Ashwin Chandra Putta (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310830#comment-15310830
 ] 

Ashwin Chandra Putta commented on APEXMALHAR-1150:
--

correct, closing this.

> create operator that can convert JSON string to object and vs versa
> ---
>
> Key: APEXMALHAR-1150
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1150
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Ashwin Chandra Putta
>Assignee: Ashwin Chandra Putta
>
> Create an operator that can 
> 1. convert JSON byte[] or string into an object
> 2. serialize object as JSON



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (APEXMALHAR-1769) Hyperloglog operator

2016-06-01 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-1769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin resolved APEXMALHAR-1769.
--
Resolution: Duplicate

Duplicates APEXMALHAR-1822

> Hyperloglog operator
> 
>
> Key: APEXMALHAR-1769
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1769
> Project: Apache Apex Malhar
>  Issue Type: Task
>  Components: algorithms
>Reporter: Amol
>Assignee: Amol
>
> It will be nice to have an hyperloglog operator



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Reopened] (APEXMALHAR-1769) Hyperloglog operator

2016-06-01 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-1769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin reopened APEXMALHAR-1769:
--

> Hyperloglog operator
> 
>
> Key: APEXMALHAR-1769
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1769
> Project: Apache Apex Malhar
>  Issue Type: Task
>  Components: algorithms
>Reporter: Amol
>Assignee: Amol
>
> It will be nice to have an hyperloglog operator



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (APEXMALHAR-1769) Hyperloglog operator

2016-06-01 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-1769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin resolved APEXMALHAR-1769.
--
Resolution: Duplicate

This issue duplicates APEXMALHAR-1822

> Hyperloglog operator
> 
>
> Key: APEXMALHAR-1769
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1769
> Project: Apache Apex Malhar
>  Issue Type: Task
>  Components: algorithms
>Reporter: Amol
>Assignee: Amol
>
> It will be nice to have an hyperloglog operator



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-1150) create operator that can convert JSON string to object and vs versa

2016-06-01 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310807#comment-15310807
 ] 

Ilya Ganelin commented on APEXMALHAR-1150:
--

This appears resolved between the JsonFormatter, JsonParser, and 
JsonByteArrayOperator classes. 

[~ashwinchandrap] can you confirm?

> create operator that can convert JSON string to object and vs versa
> ---
>
> Key: APEXMALHAR-1150
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1150
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Ashwin Chandra Putta
>Assignee: Ashwin Chandra Putta
>
> Create an operator that can 
> 1. convert JSON byte[] or string into an object
> 2. serialize object as JSON



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-175) KeyValPair isn't automatically sticky partitioned for key

2016-06-01 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310792#comment-15310792
 ] 

Ilya Ganelin commented on APEXMALHAR-175:
-

[~pra...@datatorrent.com] Which operators have this invalid usage?


> KeyValPair isn't automatically sticky partitioned for key
> -
>
> Key: APEXMALHAR-175
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-175
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Pramod Immaneni
>Assignee: Ilya Ganelin
>
> KeyValPair is being used in some operators with the assumption that it will 
> be sticky partitioned for key but it doesn't override hashcode function 
> returning the key hashcode. KeyValPair extends AbstractMap.SimpleEntry which 
> overrides hashcode and it is a function of both key and value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2105) Enhance CSV Formatter to take in schema similar to Csv Parser

2016-06-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310787#comment-15310787
 ] 

ASF GitHub Bot commented on APEXMALHAR-2105:


Github user chinmaykolhatkar commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/306#discussion_r65412409
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java ---
@@ -58,215 +68,156 @@
 public class CsvFormatter extends Formatter
 {
 
-  private ArrayList fields;
-  @NotNull
-  protected String classname;
-  @NotNull
-  protected int fieldDelimiter;
-  protected String lineDelimiter;
+  /**
+   * Names of all the fields in the same order that would appear in output
+   * records
+   */
+  private transient String[] nameMapping;
+  /**
+   * Cell processors are an integral part of reading and writing with 
Super CSV
+   * they automate the data type conversions, and enforce constraints.
+   */
+  private transient CellProcessor[] processors;
+  /**
+   * Writing preferences that are passed through schema
+   */
+  private transient CsvPreference preference;
 
+  /**
+   * Contents of the schema.Schema is specified in a json format as per
+   * {@link DelimitedSchema}
+   */
   @NotNull
-  protected String fieldInfo;
+  private String schema;
+  /**
+   * Schema is read into this object to access fields
+   */
+  private transient DelimitedSchema delimitedParserSchema;
 
-  public enum FIELD_TYPE
-  {
-BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
-  };
+  /**
+   * metric to keep count of number of tuples emitted on error port port
+   */
+  @AutoMetric
+  protected long errorTupleCount;
+
+  /**
+   * metric to keep count of number of tuples emitted on out port
+   */
+  @AutoMetric
+  protected long emittedObjectCount;
 
-  protected transient String[] nameMapping;
-  protected transient CellProcessor[] processors;
-  protected transient CsvPreference preference;
+  /**
+   * metric to keep count of number of tuples emitted on input port
+   */
+  @AutoMetric
+  protected long incomingTuplesCount;
 
   public CsvFormatter()
   {
-fields = new ArrayList();
-fieldDelimiter = ',';
-lineDelimiter = "\r\n";
+  }
 
+  @Override
+  public void beginWindow(long windowId)
+  {
+super.beginWindow(windowId);
+errorTupleCount = 0;
+emittedObjectCount = 0;
+incomingTuplesCount = 0;
   }
 
   @Override
   public void setup(Context.OperatorContext context)
   {
 super.setup(context);
-
-//fieldInfo information
-fields = new ArrayList();
-String[] fieldInfoTuple = fieldInfo.split(",");
-for (int i = 0; i < fieldInfoTuple.length; i++) {
-  String[] fieldTuple = fieldInfoTuple[i].split(":");
-  Field field = new Field();
-  field.setName(fieldTuple[0]);
-  String[] typeFormat = fieldTuple[1].split("\\|");
-  field.setType(typeFormat[0].toUpperCase());
-  if (typeFormat.length > 1) {
-field.setFormat(typeFormat[1]);
-  }
-  getFields().add(field);
-}
-preference = new CsvPreference.Builder('"', fieldDelimiter, 
lineDelimiter).build();
-int countKeyValue = getFields().size();
-nameMapping = new String[countKeyValue];
-processors = new CellProcessor[countKeyValue];
-initialise(nameMapping, processors);
-
+delimitedParserSchema = new DelimitedSchema(schema);
+preference = new 
CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
+delimitedParserSchema.getDelimiterChar(), 
delimitedParserSchema.getLineDelimiter()).build();
+nameMapping = delimitedParserSchema.getFieldNames()
+.toArray(new String[delimitedParserSchema.getFieldNames().size()]);
+processors = getProcessor(delimitedParserSchema.getFields());
   }
 
-  private void initialise(String[] nameMapping, CellProcessor[] processors)
+  /**
+   * Returns array of cellprocessors, one for each field
+   */
+  private CellProcessor[] getProcessor(List fields)
   {
-for (int i = 0; i < getFields().size(); i++) {
-  FIELD_TYPE type = getFields().get(i).type;
-  nameMapping[i] = getFields().get(i).name;
-  if (type == FIELD_TYPE.DATE) {
-String dateFormat = getFields().get(i).format;
-processors[i] = new Optional(new FmtDate(dateFormat == null ? 
"dd/MM/" : dateFormat));
+CellProcessor[] processor = new CellProcessor[fields.size()];
+

[GitHub] incubator-apex-malhar pull request #306: APEXMALHAR-2105 enhancing CSV forma...

2016-06-01 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/306#discussion_r65412409
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java ---
@@ -58,215 +68,156 @@
 public class CsvFormatter extends Formatter
 {
 
-  private ArrayList fields;
-  @NotNull
-  protected String classname;
-  @NotNull
-  protected int fieldDelimiter;
-  protected String lineDelimiter;
+  /**
+   * Names of all the fields in the same order that would appear in output
+   * records
+   */
+  private transient String[] nameMapping;
+  /**
+   * Cell processors are an integral part of reading and writing with 
Super CSV
+   * they automate the data type conversions, and enforce constraints.
+   */
+  private transient CellProcessor[] processors;
+  /**
+   * Writing preferences that are passed through schema
+   */
+  private transient CsvPreference preference;
 
+  /**
+   * Contents of the schema.Schema is specified in a json format as per
+   * {@link DelimitedSchema}
+   */
   @NotNull
-  protected String fieldInfo;
+  private String schema;
+  /**
+   * Schema is read into this object to access fields
+   */
+  private transient DelimitedSchema delimitedParserSchema;
 
-  public enum FIELD_TYPE
-  {
-BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
-  };
+  /**
+   * metric to keep count of number of tuples emitted on error port port
+   */
+  @AutoMetric
+  protected long errorTupleCount;
+
+  /**
+   * metric to keep count of number of tuples emitted on out port
+   */
+  @AutoMetric
+  protected long emittedObjectCount;
 
-  protected transient String[] nameMapping;
-  protected transient CellProcessor[] processors;
-  protected transient CsvPreference preference;
+  /**
+   * metric to keep count of number of tuples emitted on input port
+   */
+  @AutoMetric
+  protected long incomingTuplesCount;
 
   public CsvFormatter()
   {
-fields = new ArrayList();
-fieldDelimiter = ',';
-lineDelimiter = "\r\n";
+  }
 
+  @Override
+  public void beginWindow(long windowId)
+  {
+super.beginWindow(windowId);
+errorTupleCount = 0;
+emittedObjectCount = 0;
+incomingTuplesCount = 0;
   }
 
   @Override
   public void setup(Context.OperatorContext context)
   {
 super.setup(context);
-
-//fieldInfo information
-fields = new ArrayList();
-String[] fieldInfoTuple = fieldInfo.split(",");
-for (int i = 0; i < fieldInfoTuple.length; i++) {
-  String[] fieldTuple = fieldInfoTuple[i].split(":");
-  Field field = new Field();
-  field.setName(fieldTuple[0]);
-  String[] typeFormat = fieldTuple[1].split("\\|");
-  field.setType(typeFormat[0].toUpperCase());
-  if (typeFormat.length > 1) {
-field.setFormat(typeFormat[1]);
-  }
-  getFields().add(field);
-}
-preference = new CsvPreference.Builder('"', fieldDelimiter, 
lineDelimiter).build();
-int countKeyValue = getFields().size();
-nameMapping = new String[countKeyValue];
-processors = new CellProcessor[countKeyValue];
-initialise(nameMapping, processors);
-
+delimitedParserSchema = new DelimitedSchema(schema);
+preference = new 
CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
+delimitedParserSchema.getDelimiterChar(), 
delimitedParserSchema.getLineDelimiter()).build();
+nameMapping = delimitedParserSchema.getFieldNames()
+.toArray(new String[delimitedParserSchema.getFieldNames().size()]);
+processors = getProcessor(delimitedParserSchema.getFields());
   }
 
-  private void initialise(String[] nameMapping, CellProcessor[] processors)
+  /**
+   * Returns array of cellprocessors, one for each field
+   */
+  private CellProcessor[] getProcessor(List fields)
   {
-for (int i = 0; i < getFields().size(); i++) {
-  FIELD_TYPE type = getFields().get(i).type;
-  nameMapping[i] = getFields().get(i).name;
-  if (type == FIELD_TYPE.DATE) {
-String dateFormat = getFields().get(i).format;
-processors[i] = new Optional(new FmtDate(dateFormat == null ? 
"dd/MM/" : dateFormat));
+CellProcessor[] processor = new CellProcessor[fields.size()];
+int fieldCount = 0;
+for (Field field : fields) {
+  if (field.getType() == FieldType.DATE) {
+String format = 
field.getConstraints().get(DelimitedSchema.DATE_FORMAT) == null ? null
+: 

[GitHub] incubator-apex-malhar pull request #306: APEXMALHAR-2105 enhancing CSV forma...

2016-06-01 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/306#discussion_r65412319
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java ---
@@ -58,215 +68,156 @@
 public class CsvFormatter extends Formatter
 {
 
-  private ArrayList fields;
-  @NotNull
-  protected String classname;
-  @NotNull
-  protected int fieldDelimiter;
-  protected String lineDelimiter;
+  /**
+   * Names of all the fields in the same order that would appear in output
+   * records
+   */
+  private transient String[] nameMapping;
+  /**
+   * Cell processors are an integral part of reading and writing with 
Super CSV
+   * they automate the data type conversions, and enforce constraints.
+   */
+  private transient CellProcessor[] processors;
+  /**
+   * Writing preferences that are passed through schema
+   */
+  private transient CsvPreference preference;
 
+  /**
+   * Contents of the schema.Schema is specified in a json format as per
+   * {@link DelimitedSchema}
+   */
   @NotNull
-  protected String fieldInfo;
+  private String schema;
+  /**
+   * Schema is read into this object to access fields
+   */
+  private transient DelimitedSchema delimitedParserSchema;
 
-  public enum FIELD_TYPE
-  {
-BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
-  };
+  /**
+   * metric to keep count of number of tuples emitted on error port port
+   */
+  @AutoMetric
+  protected long errorTupleCount;
+
+  /**
+   * metric to keep count of number of tuples emitted on out port
+   */
+  @AutoMetric
+  protected long emittedObjectCount;
 
-  protected transient String[] nameMapping;
-  protected transient CellProcessor[] processors;
-  protected transient CsvPreference preference;
+  /**
+   * metric to keep count of number of tuples emitted on input port
+   */
+  @AutoMetric
+  protected long incomingTuplesCount;
 
   public CsvFormatter()
   {
-fields = new ArrayList();
-fieldDelimiter = ',';
-lineDelimiter = "\r\n";
+  }
 
+  @Override
+  public void beginWindow(long windowId)
+  {
+super.beginWindow(windowId);
+errorTupleCount = 0;
+emittedObjectCount = 0;
+incomingTuplesCount = 0;
   }
 
   @Override
   public void setup(Context.OperatorContext context)
   {
 super.setup(context);
-
-//fieldInfo information
-fields = new ArrayList();
-String[] fieldInfoTuple = fieldInfo.split(",");
-for (int i = 0; i < fieldInfoTuple.length; i++) {
-  String[] fieldTuple = fieldInfoTuple[i].split(":");
-  Field field = new Field();
-  field.setName(fieldTuple[0]);
-  String[] typeFormat = fieldTuple[1].split("\\|");
-  field.setType(typeFormat[0].toUpperCase());
-  if (typeFormat.length > 1) {
-field.setFormat(typeFormat[1]);
-  }
-  getFields().add(field);
-}
-preference = new CsvPreference.Builder('"', fieldDelimiter, 
lineDelimiter).build();
-int countKeyValue = getFields().size();
-nameMapping = new String[countKeyValue];
-processors = new CellProcessor[countKeyValue];
-initialise(nameMapping, processors);
-
+delimitedParserSchema = new DelimitedSchema(schema);
+preference = new 
CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
+delimitedParserSchema.getDelimiterChar(), 
delimitedParserSchema.getLineDelimiter()).build();
+nameMapping = delimitedParserSchema.getFieldNames()
+.toArray(new String[delimitedParserSchema.getFieldNames().size()]);
+processors = getProcessor(delimitedParserSchema.getFields());
   }
 
-  private void initialise(String[] nameMapping, CellProcessor[] processors)
+  /**
+   * Returns array of cellprocessors, one for each field
+   */
+  private CellProcessor[] getProcessor(List fields)
   {
-for (int i = 0; i < getFields().size(); i++) {
-  FIELD_TYPE type = getFields().get(i).type;
-  nameMapping[i] = getFields().get(i).name;
-  if (type == FIELD_TYPE.DATE) {
-String dateFormat = getFields().get(i).format;
-processors[i] = new Optional(new FmtDate(dateFormat == null ? 
"dd/MM/" : dateFormat));
+CellProcessor[] processor = new CellProcessor[fields.size()];
+int fieldCount = 0;
+for (Field field : fields) {
+  if (field.getType() == FieldType.DATE) {
+String format = 
field.getConstraints().get(DelimitedSchema.DATE_FORMAT) == null ? null
+: 

[jira] [Commented] (APEXMALHAR-2105) Enhance CSV Formatter to take in schema similar to Csv Parser

2016-06-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310776#comment-15310776
 ] 

ASF GitHub Bot commented on APEXMALHAR-2105:


Github user chinmaykolhatkar commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/306#discussion_r65411669
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java ---
@@ -58,215 +68,156 @@
 public class CsvFormatter extends Formatter
 {
 
-  private ArrayList fields;
-  @NotNull
-  protected String classname;
-  @NotNull
-  protected int fieldDelimiter;
-  protected String lineDelimiter;
+  /**
+   * Names of all the fields in the same order that would appear in output
+   * records
+   */
+  private transient String[] nameMapping;
+  /**
+   * Cell processors are an integral part of reading and writing with 
Super CSV
+   * they automate the data type conversions, and enforce constraints.
+   */
+  private transient CellProcessor[] processors;
+  /**
+   * Writing preferences that are passed through schema
+   */
+  private transient CsvPreference preference;
 
+  /**
+   * Contents of the schema.Schema is specified in a json format as per
+   * {@link DelimitedSchema}
+   */
   @NotNull
-  protected String fieldInfo;
+  private String schema;
+  /**
+   * Schema is read into this object to access fields
+   */
+  private transient DelimitedSchema delimitedParserSchema;
 
-  public enum FIELD_TYPE
-  {
-BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
-  };
+  /**
+   * metric to keep count of number of tuples emitted on error port port
+   */
+  @AutoMetric
+  protected long errorTupleCount;
+
+  /**
+   * metric to keep count of number of tuples emitted on out port
+   */
+  @AutoMetric
+  protected long emittedObjectCount;
 
-  protected transient String[] nameMapping;
-  protected transient CellProcessor[] processors;
-  protected transient CsvPreference preference;
+  /**
+   * metric to keep count of number of tuples emitted on input port
+   */
+  @AutoMetric
+  protected long incomingTuplesCount;
 
   public CsvFormatter()
--- End diff --

You can remove the constructor as nothing is there in it.


> Enhance CSV Formatter to take in schema similar to Csv Parser
> -
>
> Key: APEXMALHAR-2105
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2105
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: shubham pathak
>
> CSV Parser takes in a  schema that specifies field names and constraints. CSV 
> Formatter also needs same information, but in the current implementation , it 
> takes it as "fieldInfo".  Enhancing CSV Formatter to support the same schema 
> as CSV Parser would make it simpler for the end user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-apex-malhar pull request #306: APEXMALHAR-2105 enhancing CSV forma...

2016-06-01 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/306#discussion_r65411669
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java ---
@@ -58,215 +68,156 @@
 public class CsvFormatter extends Formatter
 {
 
-  private ArrayList fields;
-  @NotNull
-  protected String classname;
-  @NotNull
-  protected int fieldDelimiter;
-  protected String lineDelimiter;
+  /**
+   * Names of all the fields in the same order that would appear in output
+   * records
+   */
+  private transient String[] nameMapping;
+  /**
+   * Cell processors are an integral part of reading and writing with 
Super CSV
+   * they automate the data type conversions, and enforce constraints.
+   */
+  private transient CellProcessor[] processors;
+  /**
+   * Writing preferences that are passed through schema
+   */
+  private transient CsvPreference preference;
 
+  /**
+   * Contents of the schema.Schema is specified in a json format as per
+   * {@link DelimitedSchema}
+   */
   @NotNull
-  protected String fieldInfo;
+  private String schema;
+  /**
+   * Schema is read into this object to access fields
+   */
+  private transient DelimitedSchema delimitedParserSchema;
 
-  public enum FIELD_TYPE
-  {
-BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
-  };
+  /**
+   * metric to keep count of number of tuples emitted on error port port
+   */
+  @AutoMetric
+  protected long errorTupleCount;
+
+  /**
+   * metric to keep count of number of tuples emitted on out port
+   */
+  @AutoMetric
+  protected long emittedObjectCount;
 
-  protected transient String[] nameMapping;
-  protected transient CellProcessor[] processors;
-  protected transient CsvPreference preference;
+  /**
+   * metric to keep count of number of tuples emitted on input port
+   */
+  @AutoMetric
+  protected long incomingTuplesCount;
 
   public CsvFormatter()
--- End diff --

You can remove the constructor as nothing is there in it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2105) Enhance CSV Formatter to take in schema similar to Csv Parser

2016-06-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310775#comment-15310775
 ] 

ASF GitHub Bot commented on APEXMALHAR-2105:


Github user chinmaykolhatkar commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/306#discussion_r65411586
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java ---
@@ -58,215 +68,156 @@
 public class CsvFormatter extends Formatter
 {
 
-  private ArrayList fields;
-  @NotNull
-  protected String classname;
-  @NotNull
-  protected int fieldDelimiter;
-  protected String lineDelimiter;
+  /**
+   * Names of all the fields in the same order that would appear in output
+   * records
+   */
+  private transient String[] nameMapping;
+  /**
+   * Cell processors are an integral part of reading and writing with 
Super CSV
+   * they automate the data type conversions, and enforce constraints.
+   */
+  private transient CellProcessor[] processors;
+  /**
+   * Writing preferences that are passed through schema
+   */
+  private transient CsvPreference preference;
 
+  /**
+   * Contents of the schema.Schema is specified in a json format as per
+   * {@link DelimitedSchema}
+   */
   @NotNull
-  protected String fieldInfo;
+  private String schema;
+  /**
+   * Schema is read into this object to access fields
+   */
+  private transient DelimitedSchema delimitedParserSchema;
 
-  public enum FIELD_TYPE
-  {
-BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
-  };
+  /**
+   * metric to keep count of number of tuples emitted on error port port
+   */
+  @AutoMetric
+  protected long errorTupleCount;
+
+  /**
+   * metric to keep count of number of tuples emitted on out port
+   */
+  @AutoMetric
+  protected long emittedObjectCount;
 
-  protected transient String[] nameMapping;
-  protected transient CellProcessor[] processors;
-  protected transient CsvPreference preference;
+  /**
+   * metric to keep count of number of tuples emitted on input port
+   */
+  @AutoMetric
+  protected long incomingTuplesCount;
--- End diff --

Can you make this private?


> Enhance CSV Formatter to take in schema similar to Csv Parser
> -
>
> Key: APEXMALHAR-2105
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2105
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: shubham pathak
>
> CSV Parser takes in a  schema that specifies field names and constraints. CSV 
> Formatter also needs same information, but in the current implementation , it 
> takes it as "fieldInfo".  Enhancing CSV Formatter to support the same schema 
> as CSV Parser would make it simpler for the end user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Reopened] (APEXMALHAR-175) KeyValPair isn't automatically sticky partitioned for key

2016-06-01 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin reopened APEXMALHAR-175:
-
  Assignee: Ilya Ganelin

This issue shouldn't be closed until usage of KeyValPair is replaced with 
KeyHashValPair where possible.

> KeyValPair isn't automatically sticky partitioned for key
> -
>
> Key: APEXMALHAR-175
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-175
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Pramod Immaneni
>Assignee: Ilya Ganelin
>
> KeyValPair is being used in some operators with the assumption that it will 
> be sticky partitioned for key but it doesn't override hashcode function 
> returning the key hashcode. KeyValPair extends AbstractMap.SimpleEntry which 
> overrides hashcode and it is a function of both key and value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (APEXMALHAR-175) KeyValPair isn't automatically sticky partitioned for key

2016-06-01 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310767#comment-15310767
 ] 

Ilya Ganelin edited comment on APEXMALHAR-175 at 6/1/16 6:00 PM:
-

This issue is resolved by using the existing KeyHashValPair class instead.


was (Author: ilganeli):
This issue is resolved by using the KeyHashValPair class instead.

> KeyValPair isn't automatically sticky partitioned for key
> -
>
> Key: APEXMALHAR-175
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-175
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Pramod Immaneni
>
> KeyValPair is being used in some operators with the assumption that it will 
> be sticky partitioned for key but it doesn't override hashcode function 
> returning the key hashcode. KeyValPair extends AbstractMap.SimpleEntry which 
> overrides hashcode and it is a function of both key and value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-apex-malhar pull request #306: APEXMALHAR-2105 enhancing CSV forma...

2016-06-01 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/306#discussion_r65411586
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java ---
@@ -58,215 +68,156 @@
 public class CsvFormatter extends Formatter
 {
 
-  private ArrayList fields;
-  @NotNull
-  protected String classname;
-  @NotNull
-  protected int fieldDelimiter;
-  protected String lineDelimiter;
+  /**
+   * Names of all the fields in the same order that would appear in output
+   * records
+   */
+  private transient String[] nameMapping;
+  /**
+   * Cell processors are an integral part of reading and writing with 
Super CSV
+   * they automate the data type conversions, and enforce constraints.
+   */
+  private transient CellProcessor[] processors;
+  /**
+   * Writing preferences that are passed through schema
+   */
+  private transient CsvPreference preference;
 
+  /**
+   * Contents of the schema.Schema is specified in a json format as per
+   * {@link DelimitedSchema}
+   */
   @NotNull
-  protected String fieldInfo;
+  private String schema;
+  /**
+   * Schema is read into this object to access fields
+   */
+  private transient DelimitedSchema delimitedParserSchema;
 
-  public enum FIELD_TYPE
-  {
-BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
-  };
+  /**
+   * metric to keep count of number of tuples emitted on error port port
+   */
+  @AutoMetric
+  protected long errorTupleCount;
+
+  /**
+   * metric to keep count of number of tuples emitted on out port
+   */
+  @AutoMetric
+  protected long emittedObjectCount;
 
-  protected transient String[] nameMapping;
-  protected transient CellProcessor[] processors;
-  protected transient CsvPreference preference;
+  /**
+   * metric to keep count of number of tuples emitted on input port
+   */
+  @AutoMetric
+  protected long incomingTuplesCount;
--- End diff --

Can you make this private?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-998) Compilation error while using UniqueValueCount operator.

2016-06-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310546#comment-15310546
 ] 

ASF GitHub Bot commented on APEXMALHAR-998:
---

Github user asfgit closed the pull request at:

https://github.com/apache/incubator-apex-malhar/pull/305


> Compilation error while using UniqueValueCount operator.
> 
>
> Key: APEXMALHAR-998
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-998
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Tushar Gosavi
>Assignee: Tushar Gosavi
> Fix For: 3.5.0
>
>
> Got compilation error while using UniqueValueCount operator,
> [ERROR] bootstrap class path not set in conjunction with -source 1.6
> /home/tushar/work/github/Malhar/demos/src/main/java/com/datatorrent/demos/uniquevaluetest/Application.java:[31,11]
>  error: no suitable method found for 
> addStream(String,DefaultOutputPort>,DefaultInputPort>)
> The problem is type  KeyValPair is different than type 
>  used in operator, generic type needs to be extended from 
> Object for typematch.
> The fix is
> --- a/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java
> +++ b/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java
> @@ -46,10 +46,9 @@ public class UniqueValueCount extends BaseOperator {
>  
>  
>  @InputPortFieldAnnotation(name="inputPort")
> -public transient DefaultInputPort> inputPort = new 
> DefaultInputPort>() {
> -
> +public transient DefaultInputPort> 
> inputPort = new DefaultInputPort>() {
>  @Override
> -public void process(KeyValPair pair) {
> +public void process(KeyValPair pair) {
>  Set values= interimUniqueValues.get(pair.getKey());
>  if(values==null){
>  values=Sets.newHashSet();



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (APEXMALHAR-998) Compilation error while using UniqueValueCount operator.

2016-06-01 Thread Thomas Weise (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise resolved APEXMALHAR-998.
-
   Resolution: Fixed
Fix Version/s: 3.5.0

> Compilation error while using UniqueValueCount operator.
> 
>
> Key: APEXMALHAR-998
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-998
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Tushar Gosavi
>Assignee: Tushar Gosavi
> Fix For: 3.5.0
>
>
> Got compilation error while using UniqueValueCount operator,
> [ERROR] bootstrap class path not set in conjunction with -source 1.6
> /home/tushar/work/github/Malhar/demos/src/main/java/com/datatorrent/demos/uniquevaluetest/Application.java:[31,11]
>  error: no suitable method found for 
> addStream(String,DefaultOutputPort>,DefaultInputPort>)
> The problem is type  KeyValPair is different than type 
>  used in operator, generic type needs to be extended from 
> Object for typematch.
> The fix is
> --- a/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java
> +++ b/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java
> @@ -46,10 +46,9 @@ public class UniqueValueCount extends BaseOperator {
>  
>  
>  @InputPortFieldAnnotation(name="inputPort")
> -public transient DefaultInputPort> inputPort = new 
> DefaultInputPort>() {
> -
> +public transient DefaultInputPort> 
> inputPort = new DefaultInputPort>() {
>  @Override
> -public void process(KeyValPair pair) {
> +public void process(KeyValPair pair) {
>  Set values= interimUniqueValues.get(pair.getKey());
>  if(values==null){
>  values=Sets.newHashSet();



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-apex-malhar pull request #305: APEXMALHAR-998 extend second type p...

2016-06-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-apex-malhar/pull/305


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2105) Enhance CSV Formatter to take in schema similar to Csv Parser

2016-06-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310293#comment-15310293
 ] 

ASF GitHub Bot commented on APEXMALHAR-2105:


GitHub user shubham-pathak22 opened a pull request:

https://github.com/apache/incubator-apex-malhar/pull/306

APEXMALHAR-2105 enhancing CSV formatter to read field info from schema



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shubham-pathak22/incubator-apex-malhar 
APEXMALHAR-2105

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-apex-malhar/pull/306.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #306


commit a2a863746613c9b4c594949b5b188b138b145395
Author: shubham 
Date:   2016-06-01T13:17:36Z

APEXMALHAR-2105 enhancing CSV formatter to read field info from schema




> Enhance CSV Formatter to take in schema similar to Csv Parser
> -
>
> Key: APEXMALHAR-2105
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2105
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: shubham pathak
>
> CSV Parser takes in a  schema that specifies field names and constraints. CSV 
> Formatter also needs same information, but in the current implementation , it 
> takes it as "fieldInfo".  Enhancing CSV Formatter to support the same schema 
> as CSV Parser would make it simpler for the end user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-apex-malhar pull request: APEXMALHAR-998 extend second type parame...

2016-06-01 Thread tushargosavi
GitHub user tushargosavi opened a pull request:

https://github.com/apache/incubator-apex-malhar/pull/305

APEXMALHAR-998 extend second type parameter from Object



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tushargosavi/incubator-apex-malhar 
APEXMALHAR-998

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-apex-malhar/pull/305.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #305


commit e809bee20b0761e32bf4b1213e0dafa2791178de
Author: Tushar R. Gosavi 
Date:   2016-06-01T09:32:27Z

APEXMALHAR-998 extend second type parameter from Object




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2104) input ports in BytesFileOutputOperator should be optional

2016-06-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309950#comment-15309950
 ] 

ASF GitHub Bot commented on APEXMALHAR-2104:


GitHub user yogidevendra opened a pull request:

https://github.com/apache/incubator-apex-malhar/pull/304

APEXMALHAR-2104-making input ports optional

1. Using variable shadowing for making port optional.
2. Marking BytesFileOutputOperator as @evolving

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yogidevendra/incubator-apex-malhar 
APEXMALHAR-2104-BytesFileOutput-optional-port

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-apex-malhar/pull/304.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #304


commit 85bdecf5d66b907f7f17173668afd6e24d88406c
Author: yogidevendra 
Date:   2016-06-01T06:35:31Z

APEXMALHAR-2104-making input ports optional

1. Using variable shadowing for making port optional.
2. Marking this at @evolving




> input ports in BytesFileOutputOperator should be optional
> -
>
> Key: APEXMALHAR-2104
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2104
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>Priority: Minor
>
> BytesFileOutputOperator has two ports one is for byte[] input other is for 
> String input. 
> We should allow app developers to connect to any one of this port. 
> Thus, both the streams should be marked as optional.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2104-making input ports optio...

2016-06-01 Thread yogidevendra
GitHub user yogidevendra opened a pull request:

https://github.com/apache/incubator-apex-malhar/pull/304

APEXMALHAR-2104-making input ports optional

1. Using variable shadowing for making port optional.
2. Marking BytesFileOutputOperator as @evolving

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yogidevendra/incubator-apex-malhar 
APEXMALHAR-2104-BytesFileOutput-optional-port

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-apex-malhar/pull/304.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #304


commit 85bdecf5d66b907f7f17173668afd6e24d88406c
Author: yogidevendra 
Date:   2016-06-01T06:35:31Z

APEXMALHAR-2104-making input ports optional

1. Using variable shadowing for making port optional.
2. Marking this at @evolving




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (APEXMALHAR-2104) input ports in BytesFileOutputOperator should be optional

2016-06-01 Thread Yogi Devendra (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yogi Devendra updated APEXMALHAR-2104:
--
Priority: Minor  (was: Major)

> input ports in BytesFileOutputOperator should be optional
> -
>
> Key: APEXMALHAR-2104
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2104
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>Priority: Minor
>
> BytesFileOutputOperator has two ports one is for byte[] input other is for 
> String input. 
> We should allow app developers to connect to any one of this port. 
> Thus, both the streams should be marked as optional.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2104) input ports in BytesFileOutputOperator should be optional

2016-06-01 Thread Yogi Devendra (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309364#comment-15309364
 ] 

Yogi Devendra commented on APEXMALHAR-2104:
---

input port used for byte[] is defined in the parent class. Since, there is no 
direct way to override annotations for the input port defined in the parent 
class; this port is shadowed using a port with the same name in the 
BytesFileOutputOperator.

> input ports in BytesFileOutputOperator should be optional
> -
>
> Key: APEXMALHAR-2104
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2104
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>
> BytesFileOutputOperator has two ports one is for byte[] input other is for 
> String input. 
> We should allow app developers to connect to any one of this port. 
> Thus, both the streams should be marked as optional.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)