[GitHub] incubator-apex-malhar pull request #300: APEXMALHAR-2103 Fixed the scanner i...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/300#discussion_r65475558 --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java --- @@ -375,11 +375,14 @@ public void run() lastScannedInfo = null; numDiscoveredPerIteration = 0; for (String afile : files) { - String filePath = new File(afile).getAbsolutePath(); + Path filePath = new Path(afile); LOG.debug("Scan started for input {}", filePath); - MaplastModifiedTimesForInputDir; - lastModifiedTimesForInputDir = referenceTimes.get(filePath); - scan(new Path(afile), null, lastModifiedTimesForInputDir); + Map lastModifiedTimesForInputDir = null; + if (fs.exists(filePath)) { +FileStatus fileStatus = fs.getFileStatus(filePath); +lastModifiedTimesForInputDir = referenceTimes.get(fileStatus.getPath().toUri().getPath()); --- End diff -- No. In createScannedFileInfo(), the directory/file path specified as absolute path. But, In case of local file system, files might be relative path. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2106) Support merging multiple streams with StreamMerger
[ https://issues.apache.org/jira/browse/APEXMALHAR-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311597#comment-15311597 ] ASF GitHub Bot commented on APEXMALHAR-2106: Github user ilganeli commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/307#discussion_r65474494 --- Diff: library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java --- @@ -41,35 +47,57 @@ @Stateless public class StreamMerger extends BaseOperator { - /** - * Data input port 1. - */ - public final transient DefaultInputPort data1 = new DefaultInputPort() + private int portCount = 0; + private final transient ArrayListports = new ArrayList<>(portCount); --- End diff -- A great question, was in the process of doing so. Short answer is it fails wonderfully: ``` java.lang.IllegalArgumentException: Port is not associated to any operator in the DAG: com.datatorrent.lib.stream.StreamMerger$1@3b084709 at com.datatorrent.stram.plan.logical.LogicalPlan.assertGetPortMeta(LogicalPlan.java:1501) at com.datatorrent.stram.plan.logical.LogicalPlan.access$1000(LogicalPlan.java:119) ``` The relevant question is thus is there any way to define ports in this way, e.g. an array of ports, such that the DAG is aware of the ports? > Support merging multiple streams with StreamMerger > --- > > Key: APEXMALHAR-2106 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2106 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Ilya Ganelin >Assignee: Ilya Ganelin > > To properly implement the Flatten transformation (and other Stream > combination operations), Apex must support merging data from multiple > sources. The StreamMerger operator can be improved to merge multiple streams, > rather than just the two streams it can handle in the present implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2106) Support merging multiple streams with StreamMerger
[ https://issues.apache.org/jira/browse/APEXMALHAR-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311572#comment-15311572 ] ASF GitHub Bot commented on APEXMALHAR-2106: Github user gauravgopi123 commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/307#discussion_r65472797 --- Diff: library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java --- @@ -41,35 +47,57 @@ @Stateless public class StreamMerger extends BaseOperator { - /** - * Data input port 1. - */ - public final transient DefaultInputPort data1 = new DefaultInputPort() + private int portCount = 0; + private final transient ArrayListports = new ArrayList<>(portCount); --- End diff -- Did you use this operator in a DAG and see if it works? > Support merging multiple streams with StreamMerger > --- > > Key: APEXMALHAR-2106 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2106 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Ilya Ganelin >Assignee: Ilya Ganelin > > To properly implement the Flatten transformation (and other Stream > combination operations), Apex must support merging data from multiple > sources. The StreamMerger operator can be improved to merge multiple streams, > rather than just the two streams it can handle in the present implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (APEXMALHAR-2099) Identify overlap between Beam API and existing Apex Stream API
[ https://issues.apache.org/jira/browse/APEXMALHAR-2099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311571#comment-15311571 ] Ilya Ganelin edited comment on APEXMALHAR-2099 at 6/2/16 1:45 AM: -- [~siyuan][~thw] Thanks for the comments. I've been following along all the linked work - Thomas, I've also looked into your Runner in detail. Based on the discussion here, the work in https://github.com/apache/incubator-beam/compare/master...tweise:master and in https://issues.apache.org/jira/browse/APEXMALHAR-2085, this is a fantastic start! Siyuan - it sounds like there's a clear agreement that we want to break down the transformations into windowed and un-windowed transforms, I think the next logical step is to build the Windowed Operator. This would allow us to implement a number of the transforms already present in the Runner as windowed transformations and subsequently implement trigger semantics. I'm obviously not down on the ground with you guys, so I don't know in detail who is working on what. If possible, I would propose that we work together to formulate a set of concrete JIRAs that lay out the next steps so that we are not replicating effort. It might even make sense to do this in person. Then, we can split up the labor and move forward in tackling specific problems. was (Author: ilganeli): [~siyuan][~thw] Thanks for the comments. I've been following along all the linked work - Thomas, I've also looked into your Runner in detail. Based on the discussion here, the work in https://github.com/apache/incubator-beam/compare/master...tweise:master and in https://issues.apache.org/jira/browse/APEXMALHAR-2085. This is a fantastic start! Siyuan - it sounds like there's a clear agreement that we want to break down the transformations into windowed and un-windowed transforms, I think the next logical step is to build the Windowed Operator. This would allow us to implement a number of the transforms already present in the Runner as windowed transformations and subsequently implement trigger semantics. I'm obviously not down on the ground with you guys, so I don't know in detail who is working on what. If possible, I would propose that we work together to formulate a set of concrete JIRAs that lay out the next steps so that we are not replicating effort. It might even make sense to do this in person. Then, we can split up the labor and move forward in tackling specific problems. > Identify overlap between Beam API and existing Apex Stream API > -- > > Key: APEXMALHAR-2099 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2099 > Project: Apache Apex Malhar > Issue Type: Sub-task >Reporter: Ilya Ganelin > > There should be some overlap between the Beam API and the recently released > Apex Stream API. This task captures the need to understand and document this > overlap. > AC: > * A document or JIRA comment identifying which components of the Beam API are > implement, similar, or absent within the Apex Stream API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-apex-malhar pull request #307: [APEXMALHAR-2106] Support multiple ...
Github user gauravgopi123 commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/307#discussion_r65472797 --- Diff: library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java --- @@ -41,35 +47,57 @@ @Stateless public class StreamMerger extends BaseOperator { - /** - * Data input port 1. - */ - public final transient DefaultInputPort data1 = new DefaultInputPort() + private int portCount = 0; + private final transient ArrayListports = new ArrayList<>(portCount); --- End diff -- Did you use this operator in a DAG and see if it works? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (APEXMALHAR-2085) Implement Windowed Operators
[ https://issues.apache.org/jira/browse/APEXMALHAR-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311123#comment-15311123 ] David Yan edited comment on APEXMALHAR-2085 at 6/1/16 9:43 PM: --- Based on my understanding, this looks like what needs to be done in a very high level. Assuming T is the type of the tuples: 1. Watermark generator operator that takes T as the input and generate TimeStampedValue tuples with watermark tuples. The watermark generator takes the following as configuration. - The function to get the timestamp, equivalent of lambda T -> milliseconds from epoch - watermark type (perfect, heuristic, etc). I need a little more research on how watermark is actually generated 2. A modified DimensionOperator. This has two stages: * Stage 1: Window generator that takes TimestampedValue as the input and generate WindowedValue (WindowedValue is an abstract class from Beam, which has the window information for the tuple). ** The WindowFn object to assign the window(s) for each tuple ** Possibility of merging windows * Stage 2: The actual pane generation and takes WindowedValue as the input and WindowedValue as the output. The output includes any retraction values (as a result from lateness and window merging) This operator takes the following as configuration: - The Windowing Function (type WindowFn): Describes the windowing (global? fixed? session? session gap duration?) - Accumulation mode (type Enum): Accumulating, Discarding or Accumulating & Retracting - Allowed lateness (type Duration): For dropping late tuples and purging old state (in conjunction of committed checkpoint) - The Aggregation (type lambda Iterable -> R): How we want to aggregate the tuple data. - Triggering (type Trigger): When we actually output the result to the output port The DimensionOperator will need to implement the above features. Also, the DimensionOperator will need to see whether T is actually a KV (instanceof) and if so, the tuples are aggregated by window AND key. This is very preliminary and it's possible that I'm going down the wrong path. So please provide your feedback. was (Author: davidyan): Based on my understanding, this looks like what needs to be done in a very high level. Assuming T is the type of the tuples: 1. Watermark generator operator that takes T as the input and generate TimeStampedValue tuples with watermark tuples. The watermark generator takes the following as configuration. - The function to get the timestamp, equivalent of lambda T -> milliseconds from epoch - watermark type (perfect, heuristic, etc). I need a little more research on how watermark is actually generated 2. A modified DimensionOperator. This has two stages: * Stage 1: Window generator that takes TimestampedValue as the input and generate WindowedValue (WindowedValue is an abstract class from Beam, which has the window information for the tuple). ** The WindowFn object to assign the window(s) for each tuple ** Possibility of merging windows * Stage 2: The actual pane generation and takes WindowedValue as the input and WindowedValue as the output. The output includes any retraction values (as a result from lateness and window merging) This operator takes the following as configuration: - The Windowing Function (type WindowFn): Describes how the windowing - Accumulation mode (type Enum): Accumulating, Discarding or Accumulating & Retracting - Allowed lateness (type Duration): For dropping late tuples and purging old state (in conjunction of committed checkpoint) - The Aggregation (type lambda Iterable -> R): How we want to aggregate the tuple data. - Triggering (type Trigger): When we actually output the result to the output port The DimensionOperator will need to implement the above features. Also, the DimensionOperator will need to see whether T is actually a KV (instanceof) and if so, the tuples are aggregated by window AND key. This is very preliminary and it's possible that I'm going down the wrong path. So please provide your feedback. > Implement Windowed Operators > > > Key: APEXMALHAR-2085 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2085 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Siyuan Hua >Assignee: Siyuan Hua > > As per our recent several discussions in the community. A group of Windowed > Operators that delivers the window semantic follows the google Data Flow > model(https://cloud.google.com/dataflow/) is very important. > The operators should be designed and implemented in a way for > High-level API > Beam translation > Easy to use with other popular operator > {panel:title=Operator Hierarchy} > Hierarchy of the operators, > The windowed operators should cover all possible transformations that
[jira] [Comment Edited] (APEXMALHAR-2085) Implement Windowed Operators
[ https://issues.apache.org/jira/browse/APEXMALHAR-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311123#comment-15311123 ] David Yan edited comment on APEXMALHAR-2085 at 6/1/16 9:13 PM: --- Based on my understanding, this looks like what needs to be done in a very high level. Assuming T is the type of the tuples: 1. Watermark generator operator that takes T as the input and generate TimeStampedValue tuples with watermark tuples. The watermark generator takes the following as configuration. - The function to get the timestamp, equivalent of lambda T -> milliseconds from epoch - watermark type (perfect, heuristic, etc). I need a little more research on how watermark is actually generated 2. A modified DimensionOperator. This has two stages: * Stage 1: Window generator that takes TimestampedValue as the input and generate WindowedValue (WindowedValue is an abstract class from Beam, which has the window information for the tuple). ** The WindowFn object to assign the window(s) for each tuple ** Possibility of merging windows * Stage 2: The actual pane generation and takes WindowedValue as the input and WindowedValue as the output. The output includes any retraction values (as a result from lateness and window merging) This operator takes the following as configuration: - Accumulation mode (type Enum): Accumulating, Discarding or Accumulating & Retracting - Allowed lateness (type Duration): For dropping late tuples and purging old state (in conjunction of committed checkpoint) - The Aggregation (type lambda Iterable -> R): How we want to aggregate the tuple data. - Triggering (type Trigger): When we actually output the result to the output port The DimensionOperator will need to implement the above features. Also, the DimensionOperator will need to see whether T is actually a KV (instanceof) and if so, the tuples are aggregated by window AND key. This is very preliminary and it's possible that I'm going down the wrong path. So please provide your feedback. was (Author: davidyan): Based on my understanding, this looks like what needs to be done in a very high level. Assuming T is the type of the tuples: 1. Watermark generator operator that takes T as the input and generate TimeStampedValue tuples with watermark tuples. The watermark generator takes the following as configuration. - The function to get the timestamp, equivalent of lambda T -> milliseconds from epoch - watermark type (perfect, heuristic, etc). I need a little more research on how watermark is actually generated 2. A modified DimensionOperator. This has two stages: * Stage 1: Window generator that takes TimestampedValue as the input and generate WindowedValue (WindowedValue is an abstract class from Beam, which has the window information for the tuple). ** The WindowFn object to assign the window(s) for each tuple ** Possibility of merging windows * Stage 2: The actual pane generation and takes WindowedValue as the input and WindowedValue as the output. The output includes any retraction values. This operator takes the following as configuration: - Accumulation mode (type Enum): Accumulating, Discarding or Accumulating & Retracting - Allowed lateness (type Duration): For dropping late tuples and purging old state (in conjunction of committed checkpoint) - The Aggregation (type lambda Iterable -> R): How we want to aggregate the tuple data. - Triggering (type Trigger): When we actually output the result to the output port The DimensionOperator will need to implement the above features. Also, the DimensionOperator will need to see whether T is actually a KV (instanceof) and if so, the tuples are aggregated by window AND key. This is very preliminary and it's possible that I'm going down the wrong path. So please provide your feedback. > Implement Windowed Operators > > > Key: APEXMALHAR-2085 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2085 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Siyuan Hua >Assignee: Siyuan Hua > > As per our recent several discussions in the community. A group of Windowed > Operators that delivers the window semantic follows the google Data Flow > model(https://cloud.google.com/dataflow/) is very important. > The operators should be designed and implemented in a way for > High-level API > Beam translation > Easy to use with other popular operator > {panel:title=Operator Hierarchy} > Hierarchy of the operators, > The windowed operators should cover all possible transformations that require > window, and batch processing is also considered as special window called > global window > {code} >+---+ >+-> | WindowedOperator | <+ >|
[jira] [Commented] (APEXMALHAR-2094) Quantiles sketch operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311082#comment-15311082 ] ASF GitHub Bot commented on APEXMALHAR-2094: Github user sandeep-n commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/301#discussion_r65440216 --- Diff: sketches/src/main/java/org/apache/apex/malhar/sketches/QuantilesEstimator.java --- @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sketches; + +import com.yahoo.sketches.quantiles.QuantilesSketch; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.common.util.BaseOperator; + +/** + * An implementation of BaseOperator that computes a "sketch" (a representation of the probability distribution using + * a low memory footprint) of the incoming numeric data, and evaluates/outputs the cumulative distribution function and + * quantiles of the probability distribution. Leverages the quantiles sketch implementation from the Yahoo Datasketches + * Library. + * + * Input Port(s) : + * data : Data values input port. + * + * Output Port(s) : + * cdfOutput : cumulative distribution function output port. + * quantilesOutput : quantiles output port. + * + * Partitions : No, no will yield wrong results. + * + + */ +@OperatorAnnotation(partitionable = false) +public class QuantilesEstimator extends BaseOperator +{ + /** + * Output port that emits cdf estimated at the current data point + */ + public final transient DefaultOutputPort cdfOutput = new DefaultOutputPort<>(); + /** + * Emits quantiles of stream seen thus far + */ + public final transient DefaultOutputPortquantilesOutput = new DefaultOutputPort<>(); + /** + * Emits probability masses on specified intervals + */ + public final transient DefaultOutputPort pmfOutput = new DefaultOutputPort<>(); + private transient QuantilesSketch quantilesSketch = QuantilesSketch.builder().build(); + /** + * This field determines the specific quantiles to be calculated. + * Default is set to compute the standard quartiles. + */ + private double[] fractions = {0.0, 0.25, 0.50, 0.75, 1.00}; + /** + * This field determines the intervals on which the probability mass function is computed. + */ + private double[] pmfIntervals = {}; + /** + * This operator computes three different quantities which are output on separate output ports. If not using any of + * these quantities, these variables can be set to avoid unnecessary computation. + */ + private boolean computeCdf = true; + private boolean computeQuantiles = true; + private boolean computePmf = true; + public final transient DefaultInputPort data = new DefaultInputPort() + { +@Override +public void process(Double input) +{ + + quantilesSketch.update(input); + + if (computeQuantiles) { +/** + * Computes and emits quantiles of the stream seen thus far + */ +quantilesOutput.emit(quantilesSketch.getQuantiles(fractions)); + } + + if (computeCdf) { +/** + * Emits (estimate of the) cumulative distribution function evaluated at the input value, according to the + * sketched probability distribution of the stream seen thus far. + */ +cdfOutput.emit(quantilesSketch.getCDF(new double[]{input})[0]); + } + + if (computePmf) { +pmfOutput.emit(quantilesSketch.getPMF(pmfIntervals)); + } + +} + }; + + /** + * Constructor that allows non-default
[jira] [Commented] (APEXMALHAR-2099) Identify overlap between Beam API and existing Apex Stream API
[ https://issues.apache.org/jira/browse/APEXMALHAR-2099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310987#comment-15310987 ] Ilya Ganelin commented on APEXMALHAR-2099: -- The current implementation of the Apex Stream API (ApexStreamImpl.java) supports the following functions: - map - flatMap - filter - reduce - fold On the Beam side, there is not a strict "API" as far as transformation goes. Instead, Beam defines a PTransform class which implements an "apply" function that applies a given function to incoming data represented as a PCollection. There are presently on the order of 40 different transformations implemented for Beam: https://github.com/apache/incubator-beam/tree/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms The analogs to the Stream API are: Apex => Beam map => ParDo flatMap => FlatMapElements filter => Filter reduce => Combine (sort of) In General, beam presently supports a much greater variety of transformations. They also support different classes of transformation. For example, some transformations are applied over a window, while others are applied on a per-tuple basis. The windowing behavior can be explicitly specified by defining a windowing strategy. Key limitations of the current Apex Stream API are that it does not have any support for cross-stream interaction. Specifically, operations like groupByKey or join are not currently defined within the scope of the Apex Stream API and this is a serious limitation since it limits the applications that can be built. > Identify overlap between Beam API and existing Apex Stream API > -- > > Key: APEXMALHAR-2099 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2099 > Project: Apache Apex Malhar > Issue Type: Sub-task >Reporter: Ilya Ganelin > > There should be some overlap between the Beam API and the recently released > Apex Stream API. This task captures the need to understand and document this > overlap. > AC: > * A document or JIRA comment identifying which components of the Beam API are > implement, similar, or absent within the Apex Stream API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXCORE-466) Improve logging from the *Agent.java files
[ https://issues.apache.org/jira/browse/APEXCORE-466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310904#comment-15310904 ] ASF GitHub Bot commented on APEXCORE-466: - Github user ilganeli commented on the issue: https://github.com/apache/incubator-apex-core/pull/339 LGTM > Improve logging from the *Agent.java files > -- > > Key: APEXCORE-466 > URL: https://issues.apache.org/jira/browse/APEXCORE-466 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: David Yan >Assignee: David Yan > > Currently we are getting stack traces logging that actually do not > necessarily indicate an error. For example: > {code} > 2016-05-20 11:56:22,859 WARN com.datatorrent.stram.client.EventsAgent: Got > exception when reading events > java.io.FileNotFoundException: File does not exist: > /user/david/datatorrent/apps/application_1462948052533_0204/events/index.txt > at > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66) > at > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1932) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1873) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1853) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1825) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:559) > at > org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.j > ava:87) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorP > B.java:363) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos. > java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:415) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038) > {code} > This stack trace only indicates that no events have been logged yet for the > application. > We need to reduce this kind of logging to prevent false alarms to the user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-apex-core issue #339: APEXCORE-466 #resolve improved logging from ...
Github user ilganeli commented on the issue: https://github.com/apache/incubator-apex-core/pull/339 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXCORE-463) get-app-package-operators in ApexCLI not listing certain modules
[ https://issues.apache.org/jira/browse/APEXCORE-463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310893#comment-15310893 ] ASF GitHub Bot commented on APEXCORE-463: - Github user siyuanh commented on a diff in the pull request: https://github.com/apache/incubator-apex-core/pull/338#discussion_r65421945 --- Diff: engine/src/main/java/com/datatorrent/stram/webapp/TypeGraph.java --- @@ -355,7 +356,8 @@ public int size() } Set result = new TreeSet<>(); for (TypeGraphVertex node : tgv.allInstantiableDescendants) { - if ((isAncestor(InputOperator.class.getName(), node.typeName) || !getAllInputPorts(node).isEmpty())) { + if ((isAncestor(InputOperator.class.getName(), node.typeName) || isAncestor(Module.class.getName(), node.typeName) --- End diff -- Method should do what the name suggest . Should either change the name of the method or have this in a separate method. There are same problem in TypeGraph and OperatorDiscover, please fix them as well. Otherwise, it would be confusing and error-prone in the future. > get-app-package-operators in ApexCLI not listing certain modules > - > > Key: APEXCORE-463 > URL: https://issues.apache.org/jira/browse/APEXCORE-463 > Project: Apache Apex Core > Issue Type: Bug >Reporter: shubham pathak >Assignee: shubham pathak > > FSInputModule is not being shown in output of get-app-package-operators -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-apex-core pull request #338: APEXCORE-463 : fixed condition to all...
Github user siyuanh commented on a diff in the pull request: https://github.com/apache/incubator-apex-core/pull/338#discussion_r65421945 --- Diff: engine/src/main/java/com/datatorrent/stram/webapp/TypeGraph.java --- @@ -355,7 +356,8 @@ public int size() } Set result = new TreeSet<>(); for (TypeGraphVertex node : tgv.allInstantiableDescendants) { - if ((isAncestor(InputOperator.class.getName(), node.typeName) || !getAllInputPorts(node).isEmpty())) { + if ((isAncestor(InputOperator.class.getName(), node.typeName) || isAncestor(Module.class.getName(), node.typeName) --- End diff -- Method should do what the name suggest . Should either change the name of the method or have this in a separate method. There are same problem in TypeGraph and OperatorDiscover, please fix them as well. Otherwise, it would be confusing and error-prone in the future. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2103) scanner issues in FileSplitterInput class
[ https://issues.apache.org/jira/browse/APEXMALHAR-2103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310852#comment-15310852 ] ASF GitHub Bot commented on APEXMALHAR-2103: Github user DT-Priyanka commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/300#discussion_r65418317 --- Diff: library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java --- @@ -155,6 +155,38 @@ public void testFileMetadata() throws InterruptedException } @Test + public void testFileMetadataForSingleFile() throws InterruptedException --- End diff -- there is already a test with name "testSingleFile" can we rename this to something like, "testScannerFilterForDuplicates" > scanner issues in FileSplitterInput class > - > > Key: APEXMALHAR-2103 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2103 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chaitanya >Assignee: Chaitanya > > Issue: FileSplitter continuously emitting filemetadata even though there is > a single file. > Observation: For the same file, While updating and accessing the > referenceTimes map in FIleSplitterInput and TimeBasedScanner, the Keys are > different. Because of this, the oldestTimeModification is always null in > TimeBasedScanner. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-apex-malhar pull request #300: APEXMALHAR-2103 Fixed the scanner i...
Github user DT-Priyanka commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/300#discussion_r65418498 --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java --- @@ -375,11 +375,14 @@ public void run() lastScannedInfo = null; numDiscoveredPerIteration = 0; for (String afile : files) { - String filePath = new File(afile).getAbsolutePath(); + Path filePath = new Path(afile); LOG.debug("Scan started for input {}", filePath); - MaplastModifiedTimesForInputDir; - lastModifiedTimesForInputDir = referenceTimes.get(filePath); - scan(new Path(afile), null, lastModifiedTimesForInputDir); + Map lastModifiedTimesForInputDir = null; + if (fs.exists(filePath)) { +FileStatus fileStatus = fs.getFileStatus(filePath); +lastModifiedTimesForInputDir = referenceTimes.get(fileStatus.getPath().toUri().getPath()); --- End diff -- can you use "filePath" instead of fileStatus.getPath(). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-apex-malhar pull request #300: APEXMALHAR-2103 Fixed the scanner i...
Github user DT-Priyanka commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/300#discussion_r65418317 --- Diff: library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java --- @@ -155,6 +155,38 @@ public void testFileMetadata() throws InterruptedException } @Test + public void testFileMetadataForSingleFile() throws InterruptedException --- End diff -- there is already a test with name "testSingleFile" can we rename this to something like, "testScannerFilterForDuplicates" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-1978) Replace ${groupId} with ${project.groupId} in modules and project pom
[ https://issues.apache.org/jira/browse/APEXMALHAR-1978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310847#comment-15310847 ] ASF GitHub Bot commented on APEXMALHAR-1978: GitHub user ilganeli opened a pull request: https://github.com/apache/incubator-apex-core/pull/348 [APEXMALHAR-1978] Replaced usages of {groupId} with {project.groupId} * There were no instances of this within the Malhar project. * Fixed usages within Core You can merge this pull request into a Git repository by running: $ git pull https://github.com/ilganeli/incubator-apex-core APEXMALHAR-1978 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-apex-core/pull/348.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #348 commit 5f8d3e6d37f5353a70922a998089d862846e1546 Author: Ilya GanelinDate: 2016-06-01T18:31:22Z Replaced usages of {groupId} with {project.groupId} removed style change. > Replace ${groupId} with ${project.groupId} in modules and project pom > - > > Key: APEXMALHAR-1978 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1978 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Vlad Rozov >Priority: Minor > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-apex-core pull request #348: [APEXMALHAR-1978] Replaced usages of ...
GitHub user ilganeli opened a pull request: https://github.com/apache/incubator-apex-core/pull/348 [APEXMALHAR-1978] Replaced usages of {groupId} with {project.groupId} * There were no instances of this within the Malhar project. * Fixed usages within Core You can merge this pull request into a Git repository by running: $ git pull https://github.com/ilganeli/incubator-apex-core APEXMALHAR-1978 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-apex-core/pull/348.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #348 commit 5f8d3e6d37f5353a70922a998089d862846e1546 Author: Ilya GanelinDate: 2016-06-01T18:31:22Z Replaced usages of {groupId} with {project.groupId} removed style change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (APEXMALHAR-537) Need a Session computation operator which works off of HTTP log
[ https://issues.apache.org/jira/browse/APEXMALHAR-537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ashwin Chandra Putta closed APEXMALHAR-537. --- Resolution: Invalid > Need a Session computation operator which works off of HTTP log > --- > > Key: APEXMALHAR-537 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-537 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Chetan Narsude >Assignee: Ashwin Chandra Putta > Labels: feature > > Session operator has various attributes - the average, median, max duration > of the session. The number of sessions per user, total number of sessions etc. > Develop an operator which takes in the HTTP log stream and annotates the log > entries with the session they belong to. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (APEXMALHAR-281) [logstream] add alerts
[ https://issues.apache.org/jira/browse/APEXMALHAR-281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ashwin Chandra Putta closed APEXMALHAR-281. --- Resolution: Invalid > [logstream] add alerts > -- > > Key: APEXMALHAR-281 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-281 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Ashwin Chandra Putta >Assignee: Ashwin Chandra Putta > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (APEXMALHAR-1150) create operator that can convert JSON string to object and vs versa
[ https://issues.apache.org/jira/browse/APEXMALHAR-1150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ashwin Chandra Putta closed APEXMALHAR-1150. Resolution: Won't Fix > create operator that can convert JSON string to object and vs versa > --- > > Key: APEXMALHAR-1150 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1150 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Ashwin Chandra Putta >Assignee: Ashwin Chandra Putta > > Create an operator that can > 1. convert JSON byte[] or string into an object > 2. serialize object as JSON -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-1150) create operator that can convert JSON string to object and vs versa
[ https://issues.apache.org/jira/browse/APEXMALHAR-1150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310830#comment-15310830 ] Ashwin Chandra Putta commented on APEXMALHAR-1150: -- correct, closing this. > create operator that can convert JSON string to object and vs versa > --- > > Key: APEXMALHAR-1150 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1150 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Ashwin Chandra Putta >Assignee: Ashwin Chandra Putta > > Create an operator that can > 1. convert JSON byte[] or string into an object > 2. serialize object as JSON -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (APEXMALHAR-1769) Hyperloglog operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-1769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ilya Ganelin resolved APEXMALHAR-1769. -- Resolution: Duplicate Duplicates APEXMALHAR-1822 > Hyperloglog operator > > > Key: APEXMALHAR-1769 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1769 > Project: Apache Apex Malhar > Issue Type: Task > Components: algorithms >Reporter: Amol >Assignee: Amol > > It will be nice to have an hyperloglog operator -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Reopened] (APEXMALHAR-1769) Hyperloglog operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-1769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ilya Ganelin reopened APEXMALHAR-1769: -- > Hyperloglog operator > > > Key: APEXMALHAR-1769 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1769 > Project: Apache Apex Malhar > Issue Type: Task > Components: algorithms >Reporter: Amol >Assignee: Amol > > It will be nice to have an hyperloglog operator -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (APEXMALHAR-1769) Hyperloglog operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-1769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ilya Ganelin resolved APEXMALHAR-1769. -- Resolution: Duplicate This issue duplicates APEXMALHAR-1822 > Hyperloglog operator > > > Key: APEXMALHAR-1769 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1769 > Project: Apache Apex Malhar > Issue Type: Task > Components: algorithms >Reporter: Amol >Assignee: Amol > > It will be nice to have an hyperloglog operator -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-1150) create operator that can convert JSON string to object and vs versa
[ https://issues.apache.org/jira/browse/APEXMALHAR-1150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310807#comment-15310807 ] Ilya Ganelin commented on APEXMALHAR-1150: -- This appears resolved between the JsonFormatter, JsonParser, and JsonByteArrayOperator classes. [~ashwinchandrap] can you confirm? > create operator that can convert JSON string to object and vs versa > --- > > Key: APEXMALHAR-1150 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1150 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Ashwin Chandra Putta >Assignee: Ashwin Chandra Putta > > Create an operator that can > 1. convert JSON byte[] or string into an object > 2. serialize object as JSON -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-175) KeyValPair isn't automatically sticky partitioned for key
[ https://issues.apache.org/jira/browse/APEXMALHAR-175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310792#comment-15310792 ] Ilya Ganelin commented on APEXMALHAR-175: - [~pra...@datatorrent.com] Which operators have this invalid usage? > KeyValPair isn't automatically sticky partitioned for key > - > > Key: APEXMALHAR-175 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-175 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Pramod Immaneni >Assignee: Ilya Ganelin > > KeyValPair is being used in some operators with the assumption that it will > be sticky partitioned for key but it doesn't override hashcode function > returning the key hashcode. KeyValPair extends AbstractMap.SimpleEntry which > overrides hashcode and it is a function of both key and value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2105) Enhance CSV Formatter to take in schema similar to Csv Parser
[ https://issues.apache.org/jira/browse/APEXMALHAR-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310787#comment-15310787 ] ASF GitHub Bot commented on APEXMALHAR-2105: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/306#discussion_r65412409 --- Diff: contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java --- @@ -58,215 +68,156 @@ public class CsvFormatter extends Formatter { - private ArrayList fields; - @NotNull - protected String classname; - @NotNull - protected int fieldDelimiter; - protected String lineDelimiter; + /** + * Names of all the fields in the same order that would appear in output + * records + */ + private transient String[] nameMapping; + /** + * Cell processors are an integral part of reading and writing with Super CSV + * they automate the data type conversions, and enforce constraints. + */ + private transient CellProcessor[] processors; + /** + * Writing preferences that are passed through schema + */ + private transient CsvPreference preference; + /** + * Contents of the schema.Schema is specified in a json format as per + * {@link DelimitedSchema} + */ @NotNull - protected String fieldInfo; + private String schema; + /** + * Schema is read into this object to access fields + */ + private transient DelimitedSchema delimitedParserSchema; - public enum FIELD_TYPE - { -BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE - }; + /** + * metric to keep count of number of tuples emitted on error port port + */ + @AutoMetric + protected long errorTupleCount; + + /** + * metric to keep count of number of tuples emitted on out port + */ + @AutoMetric + protected long emittedObjectCount; - protected transient String[] nameMapping; - protected transient CellProcessor[] processors; - protected transient CsvPreference preference; + /** + * metric to keep count of number of tuples emitted on input port + */ + @AutoMetric + protected long incomingTuplesCount; public CsvFormatter() { -fields = new ArrayList(); -fieldDelimiter = ','; -lineDelimiter = "\r\n"; + } + @Override + public void beginWindow(long windowId) + { +super.beginWindow(windowId); +errorTupleCount = 0; +emittedObjectCount = 0; +incomingTuplesCount = 0; } @Override public void setup(Context.OperatorContext context) { super.setup(context); - -//fieldInfo information -fields = new ArrayList(); -String[] fieldInfoTuple = fieldInfo.split(","); -for (int i = 0; i < fieldInfoTuple.length; i++) { - String[] fieldTuple = fieldInfoTuple[i].split(":"); - Field field = new Field(); - field.setName(fieldTuple[0]); - String[] typeFormat = fieldTuple[1].split("\\|"); - field.setType(typeFormat[0].toUpperCase()); - if (typeFormat.length > 1) { -field.setFormat(typeFormat[1]); - } - getFields().add(field); -} -preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build(); -int countKeyValue = getFields().size(); -nameMapping = new String[countKeyValue]; -processors = new CellProcessor[countKeyValue]; -initialise(nameMapping, processors); - +delimitedParserSchema = new DelimitedSchema(schema); +preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(), +delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build(); +nameMapping = delimitedParserSchema.getFieldNames() +.toArray(new String[delimitedParserSchema.getFieldNames().size()]); +processors = getProcessor(delimitedParserSchema.getFields()); } - private void initialise(String[] nameMapping, CellProcessor[] processors) + /** + * Returns array of cellprocessors, one for each field + */ + private CellProcessor[] getProcessor(List fields) { -for (int i = 0; i < getFields().size(); i++) { - FIELD_TYPE type = getFields().get(i).type; - nameMapping[i] = getFields().get(i).name; - if (type == FIELD_TYPE.DATE) { -String dateFormat = getFields().get(i).format; -processors[i] = new Optional(new FmtDate(dateFormat == null ? "dd/MM/" : dateFormat)); +CellProcessor[] processor = new CellProcessor[fields.size()]; +
[GitHub] incubator-apex-malhar pull request #306: APEXMALHAR-2105 enhancing CSV forma...
Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/306#discussion_r65412409 --- Diff: contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java --- @@ -58,215 +68,156 @@ public class CsvFormatter extends Formatter { - private ArrayList fields; - @NotNull - protected String classname; - @NotNull - protected int fieldDelimiter; - protected String lineDelimiter; + /** + * Names of all the fields in the same order that would appear in output + * records + */ + private transient String[] nameMapping; + /** + * Cell processors are an integral part of reading and writing with Super CSV + * they automate the data type conversions, and enforce constraints. + */ + private transient CellProcessor[] processors; + /** + * Writing preferences that are passed through schema + */ + private transient CsvPreference preference; + /** + * Contents of the schema.Schema is specified in a json format as per + * {@link DelimitedSchema} + */ @NotNull - protected String fieldInfo; + private String schema; + /** + * Schema is read into this object to access fields + */ + private transient DelimitedSchema delimitedParserSchema; - public enum FIELD_TYPE - { -BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE - }; + /** + * metric to keep count of number of tuples emitted on error port port + */ + @AutoMetric + protected long errorTupleCount; + + /** + * metric to keep count of number of tuples emitted on out port + */ + @AutoMetric + protected long emittedObjectCount; - protected transient String[] nameMapping; - protected transient CellProcessor[] processors; - protected transient CsvPreference preference; + /** + * metric to keep count of number of tuples emitted on input port + */ + @AutoMetric + protected long incomingTuplesCount; public CsvFormatter() { -fields = new ArrayList(); -fieldDelimiter = ','; -lineDelimiter = "\r\n"; + } + @Override + public void beginWindow(long windowId) + { +super.beginWindow(windowId); +errorTupleCount = 0; +emittedObjectCount = 0; +incomingTuplesCount = 0; } @Override public void setup(Context.OperatorContext context) { super.setup(context); - -//fieldInfo information -fields = new ArrayList(); -String[] fieldInfoTuple = fieldInfo.split(","); -for (int i = 0; i < fieldInfoTuple.length; i++) { - String[] fieldTuple = fieldInfoTuple[i].split(":"); - Field field = new Field(); - field.setName(fieldTuple[0]); - String[] typeFormat = fieldTuple[1].split("\\|"); - field.setType(typeFormat[0].toUpperCase()); - if (typeFormat.length > 1) { -field.setFormat(typeFormat[1]); - } - getFields().add(field); -} -preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build(); -int countKeyValue = getFields().size(); -nameMapping = new String[countKeyValue]; -processors = new CellProcessor[countKeyValue]; -initialise(nameMapping, processors); - +delimitedParserSchema = new DelimitedSchema(schema); +preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(), +delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build(); +nameMapping = delimitedParserSchema.getFieldNames() +.toArray(new String[delimitedParserSchema.getFieldNames().size()]); +processors = getProcessor(delimitedParserSchema.getFields()); } - private void initialise(String[] nameMapping, CellProcessor[] processors) + /** + * Returns array of cellprocessors, one for each field + */ + private CellProcessor[] getProcessor(List fields) { -for (int i = 0; i < getFields().size(); i++) { - FIELD_TYPE type = getFields().get(i).type; - nameMapping[i] = getFields().get(i).name; - if (type == FIELD_TYPE.DATE) { -String dateFormat = getFields().get(i).format; -processors[i] = new Optional(new FmtDate(dateFormat == null ? "dd/MM/" : dateFormat)); +CellProcessor[] processor = new CellProcessor[fields.size()]; +int fieldCount = 0; +for (Field field : fields) { + if (field.getType() == FieldType.DATE) { +String format = field.getConstraints().get(DelimitedSchema.DATE_FORMAT) == null ? null +:
[GitHub] incubator-apex-malhar pull request #306: APEXMALHAR-2105 enhancing CSV forma...
Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/306#discussion_r65412319 --- Diff: contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java --- @@ -58,215 +68,156 @@ public class CsvFormatter extends Formatter { - private ArrayList fields; - @NotNull - protected String classname; - @NotNull - protected int fieldDelimiter; - protected String lineDelimiter; + /** + * Names of all the fields in the same order that would appear in output + * records + */ + private transient String[] nameMapping; + /** + * Cell processors are an integral part of reading and writing with Super CSV + * they automate the data type conversions, and enforce constraints. + */ + private transient CellProcessor[] processors; + /** + * Writing preferences that are passed through schema + */ + private transient CsvPreference preference; + /** + * Contents of the schema.Schema is specified in a json format as per + * {@link DelimitedSchema} + */ @NotNull - protected String fieldInfo; + private String schema; + /** + * Schema is read into this object to access fields + */ + private transient DelimitedSchema delimitedParserSchema; - public enum FIELD_TYPE - { -BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE - }; + /** + * metric to keep count of number of tuples emitted on error port port + */ + @AutoMetric + protected long errorTupleCount; + + /** + * metric to keep count of number of tuples emitted on out port + */ + @AutoMetric + protected long emittedObjectCount; - protected transient String[] nameMapping; - protected transient CellProcessor[] processors; - protected transient CsvPreference preference; + /** + * metric to keep count of number of tuples emitted on input port + */ + @AutoMetric + protected long incomingTuplesCount; public CsvFormatter() { -fields = new ArrayList(); -fieldDelimiter = ','; -lineDelimiter = "\r\n"; + } + @Override + public void beginWindow(long windowId) + { +super.beginWindow(windowId); +errorTupleCount = 0; +emittedObjectCount = 0; +incomingTuplesCount = 0; } @Override public void setup(Context.OperatorContext context) { super.setup(context); - -//fieldInfo information -fields = new ArrayList(); -String[] fieldInfoTuple = fieldInfo.split(","); -for (int i = 0; i < fieldInfoTuple.length; i++) { - String[] fieldTuple = fieldInfoTuple[i].split(":"); - Field field = new Field(); - field.setName(fieldTuple[0]); - String[] typeFormat = fieldTuple[1].split("\\|"); - field.setType(typeFormat[0].toUpperCase()); - if (typeFormat.length > 1) { -field.setFormat(typeFormat[1]); - } - getFields().add(field); -} -preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build(); -int countKeyValue = getFields().size(); -nameMapping = new String[countKeyValue]; -processors = new CellProcessor[countKeyValue]; -initialise(nameMapping, processors); - +delimitedParserSchema = new DelimitedSchema(schema); +preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(), +delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build(); +nameMapping = delimitedParserSchema.getFieldNames() +.toArray(new String[delimitedParserSchema.getFieldNames().size()]); +processors = getProcessor(delimitedParserSchema.getFields()); } - private void initialise(String[] nameMapping, CellProcessor[] processors) + /** + * Returns array of cellprocessors, one for each field + */ + private CellProcessor[] getProcessor(List fields) { -for (int i = 0; i < getFields().size(); i++) { - FIELD_TYPE type = getFields().get(i).type; - nameMapping[i] = getFields().get(i).name; - if (type == FIELD_TYPE.DATE) { -String dateFormat = getFields().get(i).format; -processors[i] = new Optional(new FmtDate(dateFormat == null ? "dd/MM/" : dateFormat)); +CellProcessor[] processor = new CellProcessor[fields.size()]; +int fieldCount = 0; +for (Field field : fields) { + if (field.getType() == FieldType.DATE) { +String format = field.getConstraints().get(DelimitedSchema.DATE_FORMAT) == null ? null +:
[jira] [Commented] (APEXMALHAR-2105) Enhance CSV Formatter to take in schema similar to Csv Parser
[ https://issues.apache.org/jira/browse/APEXMALHAR-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310776#comment-15310776 ] ASF GitHub Bot commented on APEXMALHAR-2105: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/306#discussion_r65411669 --- Diff: contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java --- @@ -58,215 +68,156 @@ public class CsvFormatter extends Formatter { - private ArrayList fields; - @NotNull - protected String classname; - @NotNull - protected int fieldDelimiter; - protected String lineDelimiter; + /** + * Names of all the fields in the same order that would appear in output + * records + */ + private transient String[] nameMapping; + /** + * Cell processors are an integral part of reading and writing with Super CSV + * they automate the data type conversions, and enforce constraints. + */ + private transient CellProcessor[] processors; + /** + * Writing preferences that are passed through schema + */ + private transient CsvPreference preference; + /** + * Contents of the schema.Schema is specified in a json format as per + * {@link DelimitedSchema} + */ @NotNull - protected String fieldInfo; + private String schema; + /** + * Schema is read into this object to access fields + */ + private transient DelimitedSchema delimitedParserSchema; - public enum FIELD_TYPE - { -BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE - }; + /** + * metric to keep count of number of tuples emitted on error port port + */ + @AutoMetric + protected long errorTupleCount; + + /** + * metric to keep count of number of tuples emitted on out port + */ + @AutoMetric + protected long emittedObjectCount; - protected transient String[] nameMapping; - protected transient CellProcessor[] processors; - protected transient CsvPreference preference; + /** + * metric to keep count of number of tuples emitted on input port + */ + @AutoMetric + protected long incomingTuplesCount; public CsvFormatter() --- End diff -- You can remove the constructor as nothing is there in it. > Enhance CSV Formatter to take in schema similar to Csv Parser > - > > Key: APEXMALHAR-2105 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2105 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: shubham pathak > > CSV Parser takes in a schema that specifies field names and constraints. CSV > Formatter also needs same information, but in the current implementation , it > takes it as "fieldInfo". Enhancing CSV Formatter to support the same schema > as CSV Parser would make it simpler for the end user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-apex-malhar pull request #306: APEXMALHAR-2105 enhancing CSV forma...
Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/306#discussion_r65411669 --- Diff: contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java --- @@ -58,215 +68,156 @@ public class CsvFormatter extends Formatter { - private ArrayList fields; - @NotNull - protected String classname; - @NotNull - protected int fieldDelimiter; - protected String lineDelimiter; + /** + * Names of all the fields in the same order that would appear in output + * records + */ + private transient String[] nameMapping; + /** + * Cell processors are an integral part of reading and writing with Super CSV + * they automate the data type conversions, and enforce constraints. + */ + private transient CellProcessor[] processors; + /** + * Writing preferences that are passed through schema + */ + private transient CsvPreference preference; + /** + * Contents of the schema.Schema is specified in a json format as per + * {@link DelimitedSchema} + */ @NotNull - protected String fieldInfo; + private String schema; + /** + * Schema is read into this object to access fields + */ + private transient DelimitedSchema delimitedParserSchema; - public enum FIELD_TYPE - { -BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE - }; + /** + * metric to keep count of number of tuples emitted on error port port + */ + @AutoMetric + protected long errorTupleCount; + + /** + * metric to keep count of number of tuples emitted on out port + */ + @AutoMetric + protected long emittedObjectCount; - protected transient String[] nameMapping; - protected transient CellProcessor[] processors; - protected transient CsvPreference preference; + /** + * metric to keep count of number of tuples emitted on input port + */ + @AutoMetric + protected long incomingTuplesCount; public CsvFormatter() --- End diff -- You can remove the constructor as nothing is there in it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2105) Enhance CSV Formatter to take in schema similar to Csv Parser
[ https://issues.apache.org/jira/browse/APEXMALHAR-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310775#comment-15310775 ] ASF GitHub Bot commented on APEXMALHAR-2105: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/306#discussion_r65411586 --- Diff: contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java --- @@ -58,215 +68,156 @@ public class CsvFormatter extends Formatter { - private ArrayList fields; - @NotNull - protected String classname; - @NotNull - protected int fieldDelimiter; - protected String lineDelimiter; + /** + * Names of all the fields in the same order that would appear in output + * records + */ + private transient String[] nameMapping; + /** + * Cell processors are an integral part of reading and writing with Super CSV + * they automate the data type conversions, and enforce constraints. + */ + private transient CellProcessor[] processors; + /** + * Writing preferences that are passed through schema + */ + private transient CsvPreference preference; + /** + * Contents of the schema.Schema is specified in a json format as per + * {@link DelimitedSchema} + */ @NotNull - protected String fieldInfo; + private String schema; + /** + * Schema is read into this object to access fields + */ + private transient DelimitedSchema delimitedParserSchema; - public enum FIELD_TYPE - { -BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE - }; + /** + * metric to keep count of number of tuples emitted on error port port + */ + @AutoMetric + protected long errorTupleCount; + + /** + * metric to keep count of number of tuples emitted on out port + */ + @AutoMetric + protected long emittedObjectCount; - protected transient String[] nameMapping; - protected transient CellProcessor[] processors; - protected transient CsvPreference preference; + /** + * metric to keep count of number of tuples emitted on input port + */ + @AutoMetric + protected long incomingTuplesCount; --- End diff -- Can you make this private? > Enhance CSV Formatter to take in schema similar to Csv Parser > - > > Key: APEXMALHAR-2105 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2105 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: shubham pathak > > CSV Parser takes in a schema that specifies field names and constraints. CSV > Formatter also needs same information, but in the current implementation , it > takes it as "fieldInfo". Enhancing CSV Formatter to support the same schema > as CSV Parser would make it simpler for the end user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Reopened] (APEXMALHAR-175) KeyValPair isn't automatically sticky partitioned for key
[ https://issues.apache.org/jira/browse/APEXMALHAR-175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ilya Ganelin reopened APEXMALHAR-175: - Assignee: Ilya Ganelin This issue shouldn't be closed until usage of KeyValPair is replaced with KeyHashValPair where possible. > KeyValPair isn't automatically sticky partitioned for key > - > > Key: APEXMALHAR-175 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-175 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Pramod Immaneni >Assignee: Ilya Ganelin > > KeyValPair is being used in some operators with the assumption that it will > be sticky partitioned for key but it doesn't override hashcode function > returning the key hashcode. KeyValPair extends AbstractMap.SimpleEntry which > overrides hashcode and it is a function of both key and value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (APEXMALHAR-175) KeyValPair isn't automatically sticky partitioned for key
[ https://issues.apache.org/jira/browse/APEXMALHAR-175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310767#comment-15310767 ] Ilya Ganelin edited comment on APEXMALHAR-175 at 6/1/16 6:00 PM: - This issue is resolved by using the existing KeyHashValPair class instead. was (Author: ilganeli): This issue is resolved by using the KeyHashValPair class instead. > KeyValPair isn't automatically sticky partitioned for key > - > > Key: APEXMALHAR-175 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-175 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Pramod Immaneni > > KeyValPair is being used in some operators with the assumption that it will > be sticky partitioned for key but it doesn't override hashcode function > returning the key hashcode. KeyValPair extends AbstractMap.SimpleEntry which > overrides hashcode and it is a function of both key and value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-apex-malhar pull request #306: APEXMALHAR-2105 enhancing CSV forma...
Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/306#discussion_r65411586 --- Diff: contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java --- @@ -58,215 +68,156 @@ public class CsvFormatter extends Formatter { - private ArrayList fields; - @NotNull - protected String classname; - @NotNull - protected int fieldDelimiter; - protected String lineDelimiter; + /** + * Names of all the fields in the same order that would appear in output + * records + */ + private transient String[] nameMapping; + /** + * Cell processors are an integral part of reading and writing with Super CSV + * they automate the data type conversions, and enforce constraints. + */ + private transient CellProcessor[] processors; + /** + * Writing preferences that are passed through schema + */ + private transient CsvPreference preference; + /** + * Contents of the schema.Schema is specified in a json format as per + * {@link DelimitedSchema} + */ @NotNull - protected String fieldInfo; + private String schema; + /** + * Schema is read into this object to access fields + */ + private transient DelimitedSchema delimitedParserSchema; - public enum FIELD_TYPE - { -BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE - }; + /** + * metric to keep count of number of tuples emitted on error port port + */ + @AutoMetric + protected long errorTupleCount; + + /** + * metric to keep count of number of tuples emitted on out port + */ + @AutoMetric + protected long emittedObjectCount; - protected transient String[] nameMapping; - protected transient CellProcessor[] processors; - protected transient CsvPreference preference; + /** + * metric to keep count of number of tuples emitted on input port + */ + @AutoMetric + protected long incomingTuplesCount; --- End diff -- Can you make this private? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-998) Compilation error while using UniqueValueCount operator.
[ https://issues.apache.org/jira/browse/APEXMALHAR-998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310546#comment-15310546 ] ASF GitHub Bot commented on APEXMALHAR-998: --- Github user asfgit closed the pull request at: https://github.com/apache/incubator-apex-malhar/pull/305 > Compilation error while using UniqueValueCount operator. > > > Key: APEXMALHAR-998 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-998 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Tushar Gosavi >Assignee: Tushar Gosavi > Fix For: 3.5.0 > > > Got compilation error while using UniqueValueCount operator, > [ERROR] bootstrap class path not set in conjunction with -source 1.6 > /home/tushar/work/github/Malhar/demos/src/main/java/com/datatorrent/demos/uniquevaluetest/Application.java:[31,11] > error: no suitable method found for > addStream(String,DefaultOutputPort>,DefaultInputPort >) > The problem is type KeyValPair is different than type > used in operator, generic type needs to be extended from > Object for typematch. > The fix is > --- a/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java > +++ b/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java > @@ -46,10 +46,9 @@ public class UniqueValueCount extends BaseOperator { > > > @InputPortFieldAnnotation(name="inputPort") > -public transient DefaultInputPort > inputPort = new > DefaultInputPort >() { > - > +public transient DefaultInputPort > > inputPort = new DefaultInputPort >() { > @Override > -public void process(KeyValPair pair) { > +public void process(KeyValPair pair) { > Set values= interimUniqueValues.get(pair.getKey()); > if(values==null){ > values=Sets.newHashSet(); -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (APEXMALHAR-998) Compilation error while using UniqueValueCount operator.
[ https://issues.apache.org/jira/browse/APEXMALHAR-998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise resolved APEXMALHAR-998. - Resolution: Fixed Fix Version/s: 3.5.0 > Compilation error while using UniqueValueCount operator. > > > Key: APEXMALHAR-998 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-998 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Tushar Gosavi >Assignee: Tushar Gosavi > Fix For: 3.5.0 > > > Got compilation error while using UniqueValueCount operator, > [ERROR] bootstrap class path not set in conjunction with -source 1.6 > /home/tushar/work/github/Malhar/demos/src/main/java/com/datatorrent/demos/uniquevaluetest/Application.java:[31,11] > error: no suitable method found for > addStream(String,DefaultOutputPort>,DefaultInputPort >) > The problem is type KeyValPair is different than type > used in operator, generic type needs to be extended from > Object for typematch. > The fix is > --- a/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java > +++ b/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java > @@ -46,10 +46,9 @@ public class UniqueValueCount extends BaseOperator { > > > @InputPortFieldAnnotation(name="inputPort") > -public transient DefaultInputPort > inputPort = new > DefaultInputPort >() { > - > +public transient DefaultInputPort > > inputPort = new DefaultInputPort >() { > @Override > -public void process(KeyValPair pair) { > +public void process(KeyValPair pair) { > Set values= interimUniqueValues.get(pair.getKey()); > if(values==null){ > values=Sets.newHashSet(); -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-apex-malhar pull request #305: APEXMALHAR-998 extend second type p...
Github user asfgit closed the pull request at: https://github.com/apache/incubator-apex-malhar/pull/305 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2105) Enhance CSV Formatter to take in schema similar to Csv Parser
[ https://issues.apache.org/jira/browse/APEXMALHAR-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310293#comment-15310293 ] ASF GitHub Bot commented on APEXMALHAR-2105: GitHub user shubham-pathak22 opened a pull request: https://github.com/apache/incubator-apex-malhar/pull/306 APEXMALHAR-2105 enhancing CSV formatter to read field info from schema You can merge this pull request into a Git repository by running: $ git pull https://github.com/shubham-pathak22/incubator-apex-malhar APEXMALHAR-2105 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-apex-malhar/pull/306.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #306 commit a2a863746613c9b4c594949b5b188b138b145395 Author: shubhamDate: 2016-06-01T13:17:36Z APEXMALHAR-2105 enhancing CSV formatter to read field info from schema > Enhance CSV Formatter to take in schema similar to Csv Parser > - > > Key: APEXMALHAR-2105 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2105 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: shubham pathak > > CSV Parser takes in a schema that specifies field names and constraints. CSV > Formatter also needs same information, but in the current implementation , it > takes it as "fieldInfo". Enhancing CSV Formatter to support the same schema > as CSV Parser would make it simpler for the end user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-apex-malhar pull request: APEXMALHAR-998 extend second type parame...
GitHub user tushargosavi opened a pull request: https://github.com/apache/incubator-apex-malhar/pull/305 APEXMALHAR-998 extend second type parameter from Object You can merge this pull request into a Git repository by running: $ git pull https://github.com/tushargosavi/incubator-apex-malhar APEXMALHAR-998 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-apex-malhar/pull/305.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #305 commit e809bee20b0761e32bf4b1213e0dafa2791178de Author: Tushar R. GosaviDate: 2016-06-01T09:32:27Z APEXMALHAR-998 extend second type parameter from Object --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2104) input ports in BytesFileOutputOperator should be optional
[ https://issues.apache.org/jira/browse/APEXMALHAR-2104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309950#comment-15309950 ] ASF GitHub Bot commented on APEXMALHAR-2104: GitHub user yogidevendra opened a pull request: https://github.com/apache/incubator-apex-malhar/pull/304 APEXMALHAR-2104-making input ports optional 1. Using variable shadowing for making port optional. 2. Marking BytesFileOutputOperator as @evolving You can merge this pull request into a Git repository by running: $ git pull https://github.com/yogidevendra/incubator-apex-malhar APEXMALHAR-2104-BytesFileOutput-optional-port Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-apex-malhar/pull/304.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #304 commit 85bdecf5d66b907f7f17173668afd6e24d88406c Author: yogidevendraDate: 2016-06-01T06:35:31Z APEXMALHAR-2104-making input ports optional 1. Using variable shadowing for making port optional. 2. Marking this at @evolving > input ports in BytesFileOutputOperator should be optional > - > > Key: APEXMALHAR-2104 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2104 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Yogi Devendra >Assignee: Yogi Devendra >Priority: Minor > > BytesFileOutputOperator has two ports one is for byte[] input other is for > String input. > We should allow app developers to connect to any one of this port. > Thus, both the streams should be marked as optional. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2104-making input ports optio...
GitHub user yogidevendra opened a pull request: https://github.com/apache/incubator-apex-malhar/pull/304 APEXMALHAR-2104-making input ports optional 1. Using variable shadowing for making port optional. 2. Marking BytesFileOutputOperator as @evolving You can merge this pull request into a Git repository by running: $ git pull https://github.com/yogidevendra/incubator-apex-malhar APEXMALHAR-2104-BytesFileOutput-optional-port Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-apex-malhar/pull/304.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #304 commit 85bdecf5d66b907f7f17173668afd6e24d88406c Author: yogidevendraDate: 2016-06-01T06:35:31Z APEXMALHAR-2104-making input ports optional 1. Using variable shadowing for making port optional. 2. Marking this at @evolving --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (APEXMALHAR-2104) input ports in BytesFileOutputOperator should be optional
[ https://issues.apache.org/jira/browse/APEXMALHAR-2104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yogi Devendra updated APEXMALHAR-2104: -- Priority: Minor (was: Major) > input ports in BytesFileOutputOperator should be optional > - > > Key: APEXMALHAR-2104 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2104 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Yogi Devendra >Assignee: Yogi Devendra >Priority: Minor > > BytesFileOutputOperator has two ports one is for byte[] input other is for > String input. > We should allow app developers to connect to any one of this port. > Thus, both the streams should be marked as optional. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2104) input ports in BytesFileOutputOperator should be optional
[ https://issues.apache.org/jira/browse/APEXMALHAR-2104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309364#comment-15309364 ] Yogi Devendra commented on APEXMALHAR-2104: --- input port used for byte[] is defined in the parent class. Since, there is no direct way to override annotations for the input port defined in the parent class; this port is shadowed using a port with the same name in the BytesFileOutputOperator. > input ports in BytesFileOutputOperator should be optional > - > > Key: APEXMALHAR-2104 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2104 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Yogi Devendra >Assignee: Yogi Devendra > > BytesFileOutputOperator has two ports one is for byte[] input other is for > String input. > We should allow app developers to connect to any one of this port. > Thus, both the streams should be marked as optional. -- This message was sent by Atlassian JIRA (v6.3.4#6332)