[jira] [Commented] (APEXCORE-339) Support ability to tag operators as idempotent or non-idempotent

2016-06-07 Thread Thomas Weise (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319830#comment-15319830
 ] 

Thomas Weise commented on APEXCORE-339:
---

[~PramodSSImmaneni] can you please provide more info WRT implementation 
approach here.


> Support ability to tag operators as idempotent or non-idempotent
> 
>
> Key: APEXCORE-339
> URL: https://issues.apache.org/jira/browse/APEXCORE-339
> Project: Apache Apex Core
>  Issue Type: Bug
>Reporter: Pramod Immaneni
>Assignee: Pramod Immaneni
>
> Certain application require idempotency and some others don't. Additionally 
> operators such as input operators need to be specially instrumented to 
> exhibit idempotent behavior. Certain output operators like database operators 
> rely on idempotency. For these reasons we need ability to label operators as 
> idempotent or now, whether output operators require idempotency from upstream 
> operators or not. In case of a failure the platform should handle 
> non-idempotent streams by restarting the failed operator and all downsteram 
> operators at the same checkpoint.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-core pull request #349: APEXCORE-470 New API in DAG - setOperatorAttrib...

2016-06-07 Thread PramodSSImmaneni
Github user PramodSSImmaneni commented on a diff in the pull request:

https://github.com/apache/apex-core/pull/349#discussion_r66172343
  
--- Diff: api/src/main/java/com/datatorrent/api/DAG.java ---
@@ -238,9 +238,15 @@
   /**
* setAttribute.
*/
+  @Deprecated
   public abstract  void setAttribute(Operator operator, Attribute 
key, T value);
 
   /**
+   * setOperatorAttribute.
--- End diff --

javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXCORE-470) New Api for setting the attribute on the operator ( setOperatorAttribute )

2016-06-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319709#comment-15319709
 ] 

ASF GitHub Bot commented on APEXCORE-470:
-

Github user PramodSSImmaneni commented on a diff in the pull request:

https://github.com/apache/apex-core/pull/349#discussion_r66172343
  
--- Diff: api/src/main/java/com/datatorrent/api/DAG.java ---
@@ -238,9 +238,15 @@
   /**
* setAttribute.
*/
+  @Deprecated
   public abstract  void setAttribute(Operator operator, Attribute 
key, T value);
 
   /**
+   * setOperatorAttribute.
--- End diff --

javadoc


> New Api for setting the attribute on the operator ( setOperatorAttribute )
> --
>
> Key: APEXCORE-470
> URL: https://issues.apache.org/jira/browse/APEXCORE-470
> Project: Apache Apex Core
>  Issue Type: Improvement
>Reporter: Sandesh
>Assignee: Sandesh
>
> Currently, *setAttribute* is used to set the operator attributes. Other 2 
> Attribute setting APIs are specific to input ports (*setInputPortAttributes*) 
> and output ports (*setOutputPortsAttributes*).
> Proposal is to have *SetOperatorAttribute* api, which will clearly indicate 
> that user wants set attributes on the operator.
> ( setOperatorAttribute(Operator operator, Attribute key, T value) )
> Following will be the roles for the APIs
> *setAttributes* --> for setting Attributes for the whole DAG (  
> setAttribute(Operator operator, Attribute key, T value) - can be 
> deprecated )
> *setOperatorAttributes* --> for setting Attributes for the operator 
> All the unit test cases using the previous API will be renamed as a part of 
> this change.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-core pull request #349: APEXCORE-470

2016-06-07 Thread sandeshh
GitHub user sandeshh opened a pull request:

https://github.com/apache/apex-core/pull/349

APEXCORE-470 

1. Added the new API - setOperatorAttribute 
2. updated the unit tests,
3. marked "setAttribute(Operator..."  as deprecated

@PramodSSImmaneni please review.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sandeshh/apex-core APEXCORE-470

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/349.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #349


commit bcd3426ba65f5291925dedb1e4fdfef4c80096ed
Author: sandeshh 
Date:   2016-06-07T23:27:26Z

APEXCORE-470 - Added the new api setOperatorAttribute and updated the 
tests, also marked setAttribute(Operator... as deprecated




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #282: APEXMALHAR-2066 JdbcPolling,idempotent,partit...

2016-06-07 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/282#discussion_r66169324
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -0,0 +1,652 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.validation.constraints.Min;
+
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.commons.lang3.tuple.MutablePair;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.Operator.IdleTimeHandler;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.lib.db.AbstractStoreInputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Abstract operator for for consuming data using JDBC interface
+ * User needs User needs to provide
+ * tableName,dbConnection,setEmitColumnList,look-up key 
+ * Optionally batchSize,pollInterval,Look-up key and a where clause can be 
given
+ * 
+ * This operator uses static partitioning to arrive at range queries for 
exactly
+ * once reads
+ * Assumption is that there is an ordered column using which range queries 
can
+ * be formed
+ * If an emitColumnList is provided, please ensure that the keyColumn is 
the
+ * first column in the list
+ * Range queries are formed using the {@link JdbcMetaDataUtility}} Output -
+ * comma separated list of the emit columns eg columnA,columnB,columnC
+ * 
+ * @displayName Jdbc Polling Input Operator
+ * @category Input
+ * @tags database, sql, jdbc, partitionable,exactlyOnce
+ */
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
+implements ActivationListener, IdleTimeHandler, 
Partitioner
+{
+  /*
+   * poll interval in milliseconds
+   */
+  private int pollInterval;
+
+  @Min(1)
+  private int partitionCount = 1;
+  protected transient int operatorId;
+  protected transient boolean isReplayed;
+  protected transient boolean isPollable;
+  protected int batchSize;
+  protected int fetchSize;
+  /**
+   * Map of windowId to  of the range key
+   */
+  protected transient MutablePair 
currentWindowRecoveryState;
+
+  /**
+   * size of the emit queue used to hold polled records before emit
+   */
+  private int queueCapacity = 4 * 1024 * 1024;
+  private transient volatile boolean execute;
+  private transient AtomicReference cause;
+  protected transient int spinMillis;
+  private transient OperatorContext context;
+  protected String tableName;
+  protected String key;
+  protected long currentWindowId;
+  protected KeyValPair rangeQueryPair;
+  protected String lower;
+  protected String upper;
+  protected boolean recovered;
+  protected boolean isPolled;
+  protected String whereCondition = null;
--- End diff --

Can you also clarify which subset of SQL will be supported here? How 
complex can the where clause be?


---
If your project is set up for it, you 

[jira] [Commented] (APEXMALHAR-2066) Add jdbc poller input operator

2016-06-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319666#comment-15319666
 ] 

ASF GitHub Bot commented on APEXMALHAR-2066:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/282#discussion_r66169324
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -0,0 +1,652 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.validation.constraints.Min;
+
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.commons.lang3.tuple.MutablePair;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.Operator.IdleTimeHandler;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.lib.db.AbstractStoreInputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Abstract operator for for consuming data using JDBC interface
+ * User needs User needs to provide
+ * tableName,dbConnection,setEmitColumnList,look-up key 
+ * Optionally batchSize,pollInterval,Look-up key and a where clause can be 
given
+ * 
+ * This operator uses static partitioning to arrive at range queries for 
exactly
+ * once reads
+ * Assumption is that there is an ordered column using which range queries 
can
+ * be formed
+ * If an emitColumnList is provided, please ensure that the keyColumn is 
the
+ * first column in the list
+ * Range queries are formed using the {@link JdbcMetaDataUtility}} Output -
+ * comma separated list of the emit columns eg columnA,columnB,columnC
+ * 
+ * @displayName Jdbc Polling Input Operator
+ * @category Input
+ * @tags database, sql, jdbc, partitionable,exactlyOnce
+ */
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
+implements ActivationListener, IdleTimeHandler, 
Partitioner
+{
+  /*
+   * poll interval in milliseconds
+   */
+  private int pollInterval;
+
+  @Min(1)
+  private int partitionCount = 1;
+  protected transient int operatorId;
+  protected transient boolean isReplayed;
+  protected transient boolean isPollable;
+  protected int batchSize;
+  protected int fetchSize;
+  /**
+   * Map of windowId to  of the range key
+   */
+  protected transient MutablePair 
currentWindowRecoveryState;
+
+  /**
+   * size of the emit queue used to hold polled records before emit
+   */
+  private int queueCapacity = 4 * 1024 * 1024;
+  private transient volatile boolean execute;
+  private transient AtomicReference cause;
+  protected transient int spinMillis;
+  private transient OperatorContext context;
+  protected String tableName;
+  protected String key;
+  protected long currentWindowId;
+  protected KeyValPair rangeQueryPair;
+  protected String lower;
+  protected String upper;
+  protected boolean recovered;

[GitHub] apex-malhar pull request #314: APEXMALHAR-2113 bug fix, app test case,update...

2016-06-07 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/314#discussion_r66168863
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
 ---
@@ -34,25 +33,30 @@
 import 
com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator;
 
 /**
- * This is the base class implementation of a transactionable JDBC output 
operator.
- * Subclasses should implement the method which provides the insertion 
command.
+ * This is the base class implementation of a transactionable JDBC output
--- End diff --

Please remove the formatting / check-style changes from this PR. Lets have 
just the relevant changes in this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXCORE-339) Support ability to tag operators as idempotent or non-idempotent

2016-06-07 Thread Sandesh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319648#comment-15319648
 ] 

Sandesh commented on APEXCORE-339:
--

How does this intersect with  -> ProcessingMode - EXACTLY_ONCE ?

> Support ability to tag operators as idempotent or non-idempotent
> 
>
> Key: APEXCORE-339
> URL: https://issues.apache.org/jira/browse/APEXCORE-339
> Project: Apache Apex Core
>  Issue Type: Bug
>Reporter: Pramod Immaneni
>Assignee: Pramod Immaneni
>
> Certain application require idempotency and some others don't. Additionally 
> operators such as input operators need to be specially instrumented to 
> exhibit idempotent behavior. Certain output operators like database operators 
> rely on idempotency. For these reasons we need ability to label operators as 
> idempotent or now, whether output operators require idempotency from upstream 
> operators or not. In case of a failure the platform should handle 
> non-idempotent streams by restarting the failed operator and all downsteram 
> operators at the same checkpoint.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (APEXCORE-339) Support ability to tag operators as idempotent or non-idempotent

2016-06-07 Thread Pramod Immaneni (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15225223#comment-15225223
 ] 

Pramod Immaneni edited comment on APEXCORE-339 at 6/7/16 10:54 PM:
---

Follow development here
https://github.com/PramodSSImmaneni/incubator-apex-core/tree/APEXCORE-339


was (Author: pramodssimmaneni):
Follow development here
https://github.com/PramodSSImmaneni/incubator-apex-core/tree/idempotence-recovery

> Support ability to tag operators as idempotent or non-idempotent
> 
>
> Key: APEXCORE-339
> URL: https://issues.apache.org/jira/browse/APEXCORE-339
> Project: Apache Apex Core
>  Issue Type: Bug
>Reporter: Pramod Immaneni
>Assignee: Pramod Immaneni
>
> Certain application require idempotency and some others don't. Additionally 
> operators such as input operators need to be specially instrumented to 
> exhibit idempotent behavior. Certain output operators like database operators 
> rely on idempotency. For these reasons we need ability to label operators as 
> idempotent or now, whether output operators require idempotency from upstream 
> operators or not. In case of a failure the platform should handle 
> non-idempotent streams by restarting the failed operator and all downsteram 
> operators at the same checkpoint.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #295: APEXMALHAR-1966: Update casandra output oprea...

2016-06-07 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/295#discussion_r66164298
  
--- Diff: 
benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java
 ---
@@ -48,4 +48,9 @@ protected Statement 
setStatementParameters(PreparedStatement updateCommand,
 return boundStmnt.bind(id++,tuple);
   }
 
+  @Override
+  public void deactivate()
--- End diff --

Can you move this to the Abstract parent class? Otherwise every 
implementation would have to include this method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-1966) Cassandra output operator improvements

2016-06-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319589#comment-15319589
 ] 

ASF GitHub Bot commented on APEXMALHAR-1966:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/295#discussion_r66164298
  
--- Diff: 
benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java
 ---
@@ -48,4 +48,9 @@ protected Statement 
setStatementParameters(PreparedStatement updateCommand,
 return boundStmnt.bind(id++,tuple);
   }
 
+  @Override
+  public void deactivate()
--- End diff --

Can you move this to the Abstract parent class? Otherwise every 
implementation would have to include this method.


> Cassandra output operator improvements
> --
>
> Key: APEXMALHAR-1966
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1966
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
>
> Update existing Cassandra output operator to:
> 1. Accept use defined parameterized queries, the queries could be for update, 
> insert or delete.
> 2. Add error port to emit tuples which couldn't be written to database.
> 3. Add metrics
> 4. Provide a way to restrict batch size



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2094) Quantiles sketch operator

2016-06-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319572#comment-15319572
 ] 

ASF GitHub Bot commented on APEXMALHAR-2094:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/301#discussion_r66163272
  
--- Diff: 
sketches/src/main/java/org/apache/apex/malhar/sketches/QuantilesEstimator.java 
---
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sketches;
+
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * An implementation of BaseOperator that computes a "sketch" (a 
representation of the probability distribution using
+ * a low memory footprint) of the incoming numeric data, and 
evaluates/outputs the cumulative distribution function and
+ * quantiles of the probability distribution. Leverages the quantiles 
sketch implementation from the Yahoo Datasketches
+ * Library.
+ * 
+ * Input Port(s) : 
+ * data :  Data values input port. 
+ * 
+ * Output Port(s) :  
+ * cdfOutput : cumulative distribution function output port. 
+ * quantilesOutput : quantiles output port. 
+ * 
+ * Partitions :  No unifier. Merging these sketches is non-trivial. 

+ * +
+ */
+@OperatorAnnotation(partitionable = false)
+public class QuantilesEstimator extends BaseOperator
+{
+  /**
+   * Constructor that allows non-default initialization of the quantile 
sketch object
+   *
+   * @param k:Parameter that determines accuracy and memory usage of 
quantile sketch. See QuantilesSketch
+   *  documentation for details
+   * @param seed: The quantile sketch algorithm is inherently random. Set 
seed to 0 for reproducibility in testing, but
+   *  do not set otherwise.
+   */
+  public QuantilesEstimator(int k, short seed)
+  {
+quantilesSketch = QuantilesSketch.builder().setSeed(seed).build(k);
+  }
+
+  private transient QuantilesSketch quantilesSketch = 
QuantilesSketch.builder().build();
+
+  /**
+   * This field determines the specific quantiles to be calculated. For a 
stream of numbers, the quantile at a value
+   * 0 <= p <= 1 is the number x such that a fraction p of the numbers in 
the sorted stream are less than x. E.g., the
+   * quantile at p = 0.5 is the median (half the numbers in the stream are 
less than the median).
+   * The default is set to compute the standard quartiles (4-quantiles).
+   */
+  private double[] fractions = {0.0, 0.25, 0.50, 0.75, 1.00};
+  /**
+   * This field determines the intervals on which the probability mass 
function is computed.
+   */
+  private double[] pmfIntervals = {};
+
+  /**
+   * This operator computes three different quantities which are output on 
separate output ports. If not using any of
+   * these quantities, these variables can be set to avoid unnecessary 
computation.
+   */
+  private boolean computeCdf = true;
+  private boolean computeQuantiles = true;
+  private boolean computePmf = true;
+
+  public boolean isComputeCdf()
+  {
+return computeCdf;
+  }
+
+  public void setComputeCdf(boolean computeCdf)
+  {
+this.computeCdf = computeCdf;
+  }
+
+  public boolean isComputeQuantiles()
+  {
+return computeQuantiles;
+  }
+
+  public void setComputeQuantiles(boolean computeQuantiles)
+  {
+this.computeQuantiles = computeQuantiles;
+  }
+
+  public boolean isComputePmf()
+  {
+return computePmf;
+  }
+
+  public void setComputePmf(boolean computePmf)
+  {
+

[GitHub] apex-malhar pull request #301: [APEXMALHAR-2094] Quantiles operator

2016-06-07 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/301#discussion_r66163272
  
--- Diff: 
sketches/src/main/java/org/apache/apex/malhar/sketches/QuantilesEstimator.java 
---
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sketches;
+
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * An implementation of BaseOperator that computes a "sketch" (a 
representation of the probability distribution using
+ * a low memory footprint) of the incoming numeric data, and 
evaluates/outputs the cumulative distribution function and
+ * quantiles of the probability distribution. Leverages the quantiles 
sketch implementation from the Yahoo Datasketches
+ * Library.
+ * 
+ * Input Port(s) : 
+ * data :  Data values input port. 
+ * 
+ * Output Port(s) :  
+ * cdfOutput : cumulative distribution function output port. 
+ * quantilesOutput : quantiles output port. 
+ * 
+ * Partitions :  No unifier. Merging these sketches is non-trivial. 

+ * +
+ */
+@OperatorAnnotation(partitionable = false)
+public class QuantilesEstimator extends BaseOperator
+{
+  /**
+   * Constructor that allows non-default initialization of the quantile 
sketch object
+   *
+   * @param k:Parameter that determines accuracy and memory usage of 
quantile sketch. See QuantilesSketch
+   *  documentation for details
+   * @param seed: The quantile sketch algorithm is inherently random. Set 
seed to 0 for reproducibility in testing, but
+   *  do not set otherwise.
+   */
+  public QuantilesEstimator(int k, short seed)
+  {
+quantilesSketch = QuantilesSketch.builder().setSeed(seed).build(k);
+  }
+
+  private transient QuantilesSketch quantilesSketch = 
QuantilesSketch.builder().build();
+
+  /**
+   * This field determines the specific quantiles to be calculated. For a 
stream of numbers, the quantile at a value
+   * 0 <= p <= 1 is the number x such that a fraction p of the numbers in 
the sorted stream are less than x. E.g., the
+   * quantile at p = 0.5 is the median (half the numbers in the stream are 
less than the median).
+   * The default is set to compute the standard quartiles (4-quantiles).
+   */
+  private double[] fractions = {0.0, 0.25, 0.50, 0.75, 1.00};
+  /**
+   * This field determines the intervals on which the probability mass 
function is computed.
+   */
+  private double[] pmfIntervals = {};
+
+  /**
+   * This operator computes three different quantities which are output on 
separate output ports. If not using any of
+   * these quantities, these variables can be set to avoid unnecessary 
computation.
+   */
+  private boolean computeCdf = true;
+  private boolean computeQuantiles = true;
+  private boolean computePmf = true;
+
+  public boolean isComputeCdf()
+  {
+return computeCdf;
+  }
+
+  public void setComputeCdf(boolean computeCdf)
+  {
+this.computeCdf = computeCdf;
+  }
+
+  public boolean isComputeQuantiles()
+  {
+return computeQuantiles;
+  }
+
+  public void setComputeQuantiles(boolean computeQuantiles)
+  {
+this.computeQuantiles = computeQuantiles;
+  }
+
+  public boolean isComputePmf()
+  {
+return computePmf;
+  }
+
+  public void setComputePmf(boolean computePmf)
+  {
+this.computePmf = computePmf;
+  }
+
+  public int getK()
+  {
+return quantilesSketch.getK();
+  }
+
+  public double[] getFractions()
+  {
+return fractions;
+  }
+
+  public void setFractions(double[] 

[GitHub] apex-malhar pull request #301: [APEXMALHAR-2094] Quantiles operator

2016-06-07 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/301#discussion_r66163056
  
--- Diff: sketches/pom.xml ---
@@ -0,0 +1,75 @@
+
+http://www.w3.org/2001/XMLSchema-instance;
+ xmlns="http://maven.apache.org/POM/4.0.0;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+  
+malhar
+org.apache.apex
+3.5.0-SNAPSHOT
+  
+  4.0.0
+
+  sketches
+  Apache Apex Malhar Sketch Library
+  jar
+
+  
+
+  
+  
+org.apache.maven.plugins
+maven-jar-plugin
+2.4
+
+  
+
+  test-jar
+
+package
+  
+
+  
+
+  
+
+  
+
+  com.yahoo.datasketches
+  sketches-core
+  0.4.1
+
+
+  ${project.groupId}
+  malhar-library
+  ${project.version}
+
+
+  ${project.groupId}
+  malhar-library
+  ${project.version}
+  test
+  tests
+
+
+  
+
+
--- End diff --

add newline


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2094) Quantiles sketch operator

2016-06-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319569#comment-15319569
 ] 

ASF GitHub Bot commented on APEXMALHAR-2094:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/301#discussion_r66163010
  
--- Diff: 
sketches/src/main/java/org/apache/apex/malhar/sketches/QuantilesEstimator.java 
---
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sketches;
+
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * An implementation of BaseOperator that computes a "sketch" (a 
representation of the probability distribution using
+ * a low memory footprint) of the incoming numeric data, and 
evaluates/outputs the cumulative distribution function and
+ * quantiles of the probability distribution. Leverages the quantiles 
sketch implementation from the Yahoo Datasketches
+ * Library.
+ * 
+ * Input Port(s) : 
+ * data :  Data values input port. 
+ * 
+ * Output Port(s) :  
+ * cdfOutput : cumulative distribution function output port. 
+ * quantilesOutput : quantiles output port. 
+ * 
+ * Partitions :  No unifier. Merging these sketches is non-trivial. 

+ * +
+ */
+@OperatorAnnotation(partitionable = false)
+public class QuantilesEstimator extends BaseOperator
+{
+  /**
+   * Constructor that allows non-default initialization of the quantile 
sketch object
+   *
+   * @param k:Parameter that determines accuracy and memory usage of 
quantile sketch. See QuantilesSketch
+   *  documentation for details
+   * @param seed: The quantile sketch algorithm is inherently random. Set 
seed to 0 for reproducibility in testing, but
+   *  do not set otherwise.
+   */
+  public QuantilesEstimator(int k, short seed)
--- End diff --

You might also need to add a default constructor, for Kryo serializaion to 
work correctly.


> Quantiles sketch operator
> -
>
> Key: APEXMALHAR-2094
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2094
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Sandeep Narayanaswami
>Assignee: Sandeep Narayanaswami
>Priority: Minor
>
> An operator that "sketches" in an online fashion the probability distribution 
> of an input (numeric) data stream, enabling computation of quantiles and 
> cumulative distribution functions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #301: [APEXMALHAR-2094] Quantiles operator

2016-06-07 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/301#discussion_r66163010
  
--- Diff: 
sketches/src/main/java/org/apache/apex/malhar/sketches/QuantilesEstimator.java 
---
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sketches;
+
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * An implementation of BaseOperator that computes a "sketch" (a 
representation of the probability distribution using
+ * a low memory footprint) of the incoming numeric data, and 
evaluates/outputs the cumulative distribution function and
+ * quantiles of the probability distribution. Leverages the quantiles 
sketch implementation from the Yahoo Datasketches
+ * Library.
+ * 
+ * Input Port(s) : 
+ * data :  Data values input port. 
+ * 
+ * Output Port(s) :  
+ * cdfOutput : cumulative distribution function output port. 
+ * quantilesOutput : quantiles output port. 
+ * 
+ * Partitions :  No unifier. Merging these sketches is non-trivial. 

+ * +
+ */
+@OperatorAnnotation(partitionable = false)
+public class QuantilesEstimator extends BaseOperator
+{
+  /**
+   * Constructor that allows non-default initialization of the quantile 
sketch object
+   *
+   * @param k:Parameter that determines accuracy and memory usage of 
quantile sketch. See QuantilesSketch
+   *  documentation for details
+   * @param seed: The quantile sketch algorithm is inherently random. Set 
seed to 0 for reproducibility in testing, but
+   *  do not set otherwise.
+   */
+  public QuantilesEstimator(int k, short seed)
--- End diff --

You might also need to add a default constructor, for Kryo serializaion to 
work correctly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2113) JdbcPOJOOutputOperator

2016-06-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319562#comment-15319562
 ] 

ASF GitHub Bot commented on APEXMALHAR-2113:


GitHub user devtagare opened a pull request:

https://github.com/apache/apex-malhar/pull/314

APEXMALHAR-2113 bug fix, app test case,updated test cases

@bhupeshchawda could you please review and merge

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/devtagare/incubator-apex-malhar 
APEXMALHAR-2113-JdbcPOJOOutputOperatorFix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/314.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #314


commit d10cfb040a530115bf667333be6c299a9a7baaff
Author: devtagare 
Date:   2016-06-07T18:28:34Z

APEXMALHAR-2113 bug fix, app test case,updated test cases




> JdbcPOJOOutputOperator
> --
>
> Key: APEXMALHAR-2113
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2113
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: devendra tagare
>Assignee: devendra tagare
> Fix For: 3.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> getUpdateCommand(); is marked as @NotNull in the 
> AbstractJdbcTransactionableOutputOperator which is used by 
> JdbcPOJOOutputOperator.
> This method is referenced during the validation phase of DAG and 
> updateCommand is initialized only at setup.This is causing the DAG 
> initialization to fail on constraints violation.
> Stack trace below,
> An error occurred trying to launch the application. Server message: 
> javax.validation.ConstraintViolationException: Operator JdbcOutput violates 
> constraints 
> [ConstraintViolationImpl{rootBean=JdbcPOJOOutputOperator{name=null}, 
> propertyPath='updateCommand', message='may not be null', 
> leafBean=JdbcPOJOOutputOperator{name=null}, value=null}] at 
> com.datatorrent.stram.plan.logical.LogicalPlan.validate(LogicalPlan.java:1680)
>  at com.datatorrent.stram.StramClient.(StramClient.java:161) at 
> com.datatorrent.stram.client.StramAppLauncher.launchApp(StramAppLauncher.java:509)
>  at com.datatorrent.stram.cli.DTCli$LaunchCommand.execute(DTCli.java:2050) at 
> com.datatorrent.stram.cli.DTCli.launchAppPackage(DTCli.java:3456) at 
> com.datatorrent.stram.cli.DTCli.access$7100(DTCli.java:106) at 
> com.datatorrent.stram.cli.DTCli$LaunchCommand.execute(DTCli.java:1895) at 
> com.datatorrent.stram.cli.DTCli$3.run(DTCli.java:1449)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2094) Quantiles sketch operator

2016-06-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319544#comment-15319544
 ] 

ASF GitHub Bot commented on APEXMALHAR-2094:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/301#discussion_r66161142
  
--- Diff: 
sketches/src/main/java/org/apache/apex/malhar/sketches/QuantilesEstimator.java 
---
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sketches;
+
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * An implementation of BaseOperator that computes a "sketch" (a 
representation of the probability distribution using
+ * a low memory footprint) of the incoming numeric data, and 
evaluates/outputs the cumulative distribution function and
+ * quantiles of the probability distribution. Leverages the quantiles 
sketch implementation from the Yahoo Datasketches
+ * Library.
+ * 
+ * Input Port(s) : 
+ * data :  Data values input port. 
+ * 
+ * Output Port(s) :  
+ * cdfOutput : cumulative distribution function output port. 
+ * quantilesOutput : quantiles output port. 
+ * 
+ * Partitions :  No unifier. Merging these sketches is non-trivial. 

+ * +
+ */
+@OperatorAnnotation(partitionable = false)
+public class QuantilesEstimator extends BaseOperator
--- End diff --

Do add the @Evolving annotation as this is a new operator


> Quantiles sketch operator
> -
>
> Key: APEXMALHAR-2094
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2094
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Sandeep Narayanaswami
>Assignee: Sandeep Narayanaswami
>Priority: Minor
>
> An operator that "sketches" in an online fashion the probability distribution 
> of an input (numeric) data stream, enabling computation of quantiles and 
> cumulative distribution functions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2094) Quantiles sketch operator

2016-06-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319539#comment-15319539
 ] 

ASF GitHub Bot commented on APEXMALHAR-2094:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/301#discussion_r66160956
  
--- Diff: 
sketches/src/main/java/org/apache/apex/malhar/sketches/QuantilesEstimator.java 
---
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sketches;
+
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * An implementation of BaseOperator that computes a "sketch" (a 
representation of the probability distribution using
+ * a low memory footprint) of the incoming numeric data, and 
evaluates/outputs the cumulative distribution function and
+ * quantiles of the probability distribution. Leverages the quantiles 
sketch implementation from the Yahoo Datasketches
+ * Library.
+ * 
+ * Input Port(s) : 
+ * data :  Data values input port. 
+ * 
+ * Output Port(s) :  
+ * cdfOutput : cumulative distribution function output port. 
+ * quantilesOutput : quantiles output port. 
+ * 
+ * Partitions :  No unifier. Merging these sketches is non-trivial. 

+ * +
+ */
+@OperatorAnnotation(partitionable = false)
+public class QuantilesEstimator extends BaseOperator
+{
+  /**
+   * Constructor that allows non-default initialization of the quantile 
sketch object
+   *
+   * @param k:Parameter that determines accuracy and memory usage of 
quantile sketch. See QuantilesSketch
+   *  documentation for details
+   * @param seed: The quantile sketch algorithm is inherently random. Set 
seed to 0 for reproducibility in testing, but
+   *  do not set otherwise.
+   */
+  public QuantilesEstimator(int k, short seed)
+  {
+quantilesSketch = QuantilesSketch.builder().setSeed(seed).build(k);
+  }
+
+  private transient QuantilesSketch quantilesSketch = 
QuantilesSketch.builder().build();
+
+  /**
+   * This field determines the specific quantiles to be calculated. For a 
stream of numbers, the quantile at a value
+   * 0 <= p <= 1 is the number x such that a fraction p of the numbers in 
the sorted stream are less than x. E.g., the
+   * quantile at p = 0.5 is the median (half the numbers in the stream are 
less than the median).
+   * The default is set to compute the standard quartiles (4-quantiles).
+   */
+  private double[] fractions = {0.0, 0.25, 0.50, 0.75, 1.00};
+  /**
+   * This field determines the intervals on which the probability mass 
function is computed.
+   */
+  private double[] pmfIntervals = {};
+
+  /**
+   * This operator computes three different quantities which are output on 
separate output ports. If not using any of
+   * these quantities, these variables can be set to avoid unnecessary 
computation.
+   */
+  private boolean computeCdf = true;
+  private boolean computeQuantiles = true;
+  private boolean computePmf = true;
+
+  public boolean isComputeCdf()
+  {
+return computeCdf;
+  }
+
+  public void setComputeCdf(boolean computeCdf)
--- End diff --

Better to add java docs on getters and setters, the users will access the 
fields using these. 


> Quantiles sketch operator
> -
>
> Key: APEXMALHAR-2094
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2094
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Sandeep Narayanaswami
>Assignee: Sandeep Narayanaswami
>

Re: A proposal for Malhar

2016-06-07 Thread Pramod Immaneni
I wanted to close the loop on this discussion. In general everyone seemed
to be favorable to this idea with no serious objections. Folks had good
suggestions like documenting capabilities of operators, come up well
defined criteria for graduation of operators and what those criteria may be
and what to do with existing operators that may not yet be mature or
unused.

I am going to summarize the key points that resulted from the discussion
and would like to proceed with them.

   - Operators that do not yet provide the key platform capabilities to
   make an operator useful across different applications such as reusability,
   partitioning static or dynamic, idempotency, exactly once will still be
   accepted as long as they are functionally correct, have unit tests and will
   go into a separate module.
   - Contrib module was suggested as a place where new contributions go in
   that don't yet have all the platform capabilities and are not yet mature.
   If there are no other suggestions we will go with this one.
   - It was suggested the operators documentation list those platform
   capabilities it currently provides from the list above. I will document a
   structure for this in the contribution guidelines.
   - Folks wanted to know what would be the criteria to graduate an
   operator to the big leagues :). I will kick-off a separate thread for it as
   I think it requires its own discussion and hopefully we can come up with a
   set of guidelines for it.
   - David brought up state of some of the existing operators and their
   retirement and the layout of operators in Malhar in general and how it
   causes problems with development. I will ask him to lead the discussion on
   that.

Thanks

On Fri, May 27, 2016 at 7:47 PM, David Yan  wrote:

> The two ideas are not conflicting, but rather complementing.
>
> On the contrary, putting a new process for people trying to contribute
> while NOT addressing the old unused subpar operators in the repository is
> what is conflicting.
>
> Keep in mind that when people try to contribute, they always look at the
> existing operators already in the repository as examples and likely a model
> for their new operators.
>
> David
>
>
> On Fri, May 27, 2016 at 4:05 PM, Amol Kekre  wrote:
>
> > Yes there are two conflicting threads now. The original thread was to
> open
> > up a way for contributors to submit code in a dir (contrib?) as long as
> > license part of taken care of.
> >
> > On the thread of removing non-used operators -> How do we know what is
> > being used?
> >
> > Thks,
> > Amol
> >
> >
> > On Fri, May 27, 2016 at 3:40 PM, Sandesh Hegde 
> > wrote:
> >
> > > +1 for removing the not-used operators.
> > >
> > > So we are creating a process for operator writers who don't want to
> > > understand the platform, yet wants to contribute? How big is that set?
> > > If we tell the app-user, here is the code which has not passed all the
> > > checklist, will they be ready to use that in production?
> > >
> > > This thread has 2 conflicting forces, reduce the operators and make it
> > easy
> > > to add more operators.
> > >
> > >
> > >
> > > On Fri, May 27, 2016 at 3:03 PM Pramod Immaneni <
> pra...@datatorrent.com>
> > > wrote:
> > >
> > > > On Fri, May 27, 2016 at 2:30 PM, Gaurav Gupta <
> > gaurav.gopi...@gmail.com>
> > > > wrote:
> > > >
> > > > > Pramod,
> > > > >
> > > > > By that logic I would say let's put all partitionable operators
> into
> > > one
> > > > > folder, non-partitionable operators in another and so on...
> > > > >
> > > >
> > > > Remember the original goal of making it easier for new members to
> > > > contribute and managing those contributions to maturity. It is not a
> > > > functional level separation.
> > > >
> > > >
> > > > > When I look at hadoop code I see these annotations being used at
> > class
> > > > > level and not at package/folder level.
> > > >
> > > >
> > > > I had a typo in my email, I meant to say "think of this like a
> > folder..."
> > > > as an analogy and not literally.
> > > >
> > > > Thanks
> > > >
> > > >
> > > > > Thanks
> > > > >
> > > > > On Fri, May 27, 2016 at 2:10 PM, Pramod Immaneni <
> > > pra...@datatorrent.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > On Fri, May 27, 2016 at 1:05 PM, Gaurav Gupta <
> > > > gaurav.gopi...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Can same goal not be achieved by
> > > > > > > using
> > org.apache.hadoop.classification.InterfaceStability.Evolving
> > > /
> > > > > > > org.apache.hadoop.classification.InterfaceStability.Unstable
> > > > > annotation?
> > > > > > >
> > > > > >
> > > > > > I think it is important to localize the additions in one place so
> > > that
> > > > it
> > > > > > becomes clearer to users about the maturity level of these,
> easier
> > > for
> > > > > > developers to track them towards the path to maturity and also
> > > > provides a
> > > > > > clearer directive 

[GitHub] apex-malhar pull request #301: [APEXMALHAR-2094] Quantiles operator

2016-06-07 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/301#discussion_r66152395
  
--- Diff: 
sketches/src/main/java/org/apache/apex/malhar/sketches/QuantilesEstimator.java 
---
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sketches;
+
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * An implementation of BaseOperator that computes a "sketch" (a 
representation of the probability distribution using
+ * a low memory footprint) of the incoming numeric data, and 
evaluates/outputs the cumulative distribution function and
+ * quantiles of the probability distribution. Leverages the quantiles 
sketch implementation from the Yahoo Datasketches
+ * Library.
+ * 
+ * Input Port(s) : 
+ * data :  Data values input port. 
+ * 
+ * Output Port(s) :  
+ * cdfOutput : cumulative distribution function output port. 
+ * quantilesOutput : quantiles output port. 
+ * 
+ * Partitions :  No unifier. Merging these sketches is non-trivial. 

+ * +
+ */
+@OperatorAnnotation(partitionable = false)
+public class QuantilesEstimator extends BaseOperator
+{
+  /**
+   * Constructor that allows non-default initialization of the quantile 
sketch object
+   *
+   * @param k:Parameter that determines accuracy and memory usage of 
quantile sketch. See QuantilesSketch
+   *  documentation for details
+   * @param seed: The quantile sketch algorithm is inherently random. Set 
seed to 0 for reproducibility in testing, but
+   *  do not set otherwise.
+   */
+  public QuantilesEstimator(int k, short seed)
+  {
+quantilesSketch = QuantilesSketch.builder().setSeed(seed).build(k);
+  }
+
+  private transient QuantilesSketch quantilesSketch = 
QuantilesSketch.builder().build();
+
+  /**
+   * This field determines the specific quantiles to be calculated. For a 
stream of numbers, the quantile at a value
+   * 0 <= p <= 1 is the number x such that a fraction p of the numbers in 
the sorted stream are less than x. E.g., the
+   * quantile at p = 0.5 is the median (half the numbers in the stream are 
less than the median).
+   * The default is set to compute the standard quartiles (4-quantiles).
+   */
+  private double[] fractions = {0.0, 0.25, 0.50, 0.75, 1.00};
+  /**
+   * This field determines the intervals on which the probability mass 
function is computed.
+   */
+  private double[] pmfIntervals = {};
+
+  /**
+   * This operator computes three different quantities which are output on 
separate output ports. If not using any of
+   * these quantities, these variables can be set to avoid unnecessary 
computation.
+   */
+  private boolean computeCdf = true;
+  private boolean computeQuantiles = true;
+  private boolean computePmf = true;
+
+  public boolean isComputeCdf()
+  {
+return computeCdf;
+  }
+
+  public void setComputeCdf(boolean computeCdf)
+  {
+this.computeCdf = computeCdf;
+  }
+
+  public boolean isComputeQuantiles()
+  {
+return computeQuantiles;
+  }
+
+  public void setComputeQuantiles(boolean computeQuantiles)
+  {
+this.computeQuantiles = computeQuantiles;
+  }
+
+  public boolean isComputePmf()
+  {
+return computePmf;
+  }
+
+  public void setComputePmf(boolean computePmf)
+  {
+this.computePmf = computePmf;
+  }
+
+  public int getK()
+  {
+return quantilesSketch.getK();
+  }
+
+  public double[] getFractions()
+  {
+return fractions;
+  }
+
+  public void setFractions(double[] 

[jira] [Commented] (APEXMALHAR-2094) Quantiles sketch operator

2016-06-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319394#comment-15319394
 ] 

ASF GitHub Bot commented on APEXMALHAR-2094:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/301#discussion_r66150788
  
--- Diff: 
sketches/src/main/java/org/apache/apex/malhar/sketches/QuantilesEstimator.java 
---
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sketches;
+
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * An implementation of BaseOperator that computes a "sketch" (a 
representation of the probability distribution using
+ * a low memory footprint) of the incoming numeric data, and 
evaluates/outputs the cumulative distribution function and
+ * quantiles of the probability distribution. Leverages the quantiles 
sketch implementation from the Yahoo Datasketches
+ * Library.
+ * 
+ * Input Port(s) : 
+ * data :  Data values input port. 
+ * 
+ * Output Port(s) :  
+ * cdfOutput : cumulative distribution function output port. 
+ * quantilesOutput : quantiles output port. 
+ * 
+ * Partitions :  No unifier. Merging these sketches is non-trivial. 

+ * +
+ */
+@OperatorAnnotation(partitionable = false)
+public class QuantilesEstimator extends BaseOperator
+{
+  /**
+   * Constructor that allows non-default initialization of the quantile 
sketch object
+   *
+   * @param k:Parameter that determines accuracy and memory usage of 
quantile sketch. See QuantilesSketch
+   *  documentation for details
+   * @param seed: The quantile sketch algorithm is inherently random. Set 
seed to 0 for reproducibility in testing, but
+   *  do not set otherwise.
+   */
+  public QuantilesEstimator(int k, short seed)
+  {
+quantilesSketch = QuantilesSketch.builder().setSeed(seed).build(k);
+  }
+
+  private transient QuantilesSketch quantilesSketch = 
QuantilesSketch.builder().build();
+
+  /**
+   * This field determines the specific quantiles to be calculated. For a 
stream of numbers, the quantile at a value
+   * 0 <= p <= 1 is the number x such that a fraction p of the numbers in 
the sorted stream are less than x. E.g., the
+   * quantile at p = 0.5 is the median (half the numbers in the stream are 
less than the median).
+   * The default is set to compute the standard quartiles (4-quantiles).
+   */
+  private double[] fractions = {0.0, 0.25, 0.50, 0.75, 1.00};
+  /**
+   * This field determines the intervals on which the probability mass 
function is computed.
+   */
+  private double[] pmfIntervals = {};
+
+  /**
+   * This operator computes three different quantities which are output on 
separate output ports. If not using any of
+   * these quantities, these variables can be set to avoid unnecessary 
computation.
+   */
+  private boolean computeCdf = true;
--- End diff --

You can avoid having these boolean properties by simply checking to see if 
the corresponding output port is connected and only then do the needed 
computation. Example:
```if(cdfOutput.isConnected()) {
 // do processing
}```


> Quantiles sketch operator
> -
>
> Key: APEXMALHAR-2094
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2094
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Sandeep Narayanaswami
>Assignee: Sandeep Narayanaswami
>Priority: Minor
>
> An operator that "sketches" in an online fashion the probability 

[GitHub] apex-malhar pull request #301: [APEXMALHAR-2094] Quantiles operator

2016-06-07 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/301#discussion_r66150788
  
--- Diff: 
sketches/src/main/java/org/apache/apex/malhar/sketches/QuantilesEstimator.java 
---
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sketches;
+
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * An implementation of BaseOperator that computes a "sketch" (a 
representation of the probability distribution using
+ * a low memory footprint) of the incoming numeric data, and 
evaluates/outputs the cumulative distribution function and
+ * quantiles of the probability distribution. Leverages the quantiles 
sketch implementation from the Yahoo Datasketches
+ * Library.
+ * 
+ * Input Port(s) : 
+ * data :  Data values input port. 
+ * 
+ * Output Port(s) :  
+ * cdfOutput : cumulative distribution function output port. 
+ * quantilesOutput : quantiles output port. 
+ * 
+ * Partitions :  No unifier. Merging these sketches is non-trivial. 

+ * +
+ */
+@OperatorAnnotation(partitionable = false)
+public class QuantilesEstimator extends BaseOperator
+{
+  /**
+   * Constructor that allows non-default initialization of the quantile 
sketch object
+   *
+   * @param k:Parameter that determines accuracy and memory usage of 
quantile sketch. See QuantilesSketch
+   *  documentation for details
+   * @param seed: The quantile sketch algorithm is inherently random. Set 
seed to 0 for reproducibility in testing, but
+   *  do not set otherwise.
+   */
+  public QuantilesEstimator(int k, short seed)
+  {
+quantilesSketch = QuantilesSketch.builder().setSeed(seed).build(k);
+  }
+
+  private transient QuantilesSketch quantilesSketch = 
QuantilesSketch.builder().build();
+
+  /**
+   * This field determines the specific quantiles to be calculated. For a 
stream of numbers, the quantile at a value
+   * 0 <= p <= 1 is the number x such that a fraction p of the numbers in 
the sorted stream are less than x. E.g., the
+   * quantile at p = 0.5 is the median (half the numbers in the stream are 
less than the median).
+   * The default is set to compute the standard quartiles (4-quantiles).
+   */
+  private double[] fractions = {0.0, 0.25, 0.50, 0.75, 1.00};
+  /**
+   * This field determines the intervals on which the probability mass 
function is computed.
+   */
+  private double[] pmfIntervals = {};
+
+  /**
+   * This operator computes three different quantities which are output on 
separate output ports. If not using any of
+   * these quantities, these variables can be set to avoid unnecessary 
computation.
+   */
+  private boolean computeCdf = true;
--- End diff --

You can avoid having these boolean properties by simply checking to see if 
the corresponding output port is connected and only then do the needed 
computation. Example:
```if(cdfOutput.isConnected()) {
 // do processing
}```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2094) Quantiles sketch operator

2016-06-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319369#comment-15319369
 ] 

ASF GitHub Bot commented on APEXMALHAR-2094:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/301#discussion_r66149667
  
--- Diff: 
sketches/src/main/java/org/apache/apex/malhar/sketches/QuantilesEstimator.java 
---
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sketches;
+
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * An implementation of BaseOperator that computes a "sketch" (a 
representation of the probability distribution using
+ * a low memory footprint) of the incoming numeric data, and 
evaluates/outputs the cumulative distribution function and
+ * quantiles of the probability distribution. Leverages the quantiles 
sketch implementation from the Yahoo Datasketches
+ * Library.
+ * 
+ * Input Port(s) : 
+ * data :  Data values input port. 
+ * 
+ * Output Port(s) :  
+ * cdfOutput : cumulative distribution function output port. 
+ * quantilesOutput : quantiles output port. 
+ * 
+ * Partitions :  No unifier. Merging these sketches is non-trivial. 

+ * +
+ */
+@OperatorAnnotation(partitionable = false)
+public class QuantilesEstimator extends BaseOperator
+{
+  /**
+   * Constructor that allows non-default initialization of the quantile 
sketch object
+   *
+   * @param k:Parameter that determines accuracy and memory usage of 
quantile sketch. See QuantilesSketch
--- End diff --

A link to the documentation would be helpful


> Quantiles sketch operator
> -
>
> Key: APEXMALHAR-2094
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2094
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Sandeep Narayanaswami
>Assignee: Sandeep Narayanaswami
>Priority: Minor
>
> An operator that "sketches" in an online fashion the probability distribution 
> of an input (numeric) data stream, enabling computation of quantiles and 
> cumulative distribution functions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: APC

2016-06-07 Thread Timothy Farkas
Yeah I tried running 3.4.0 on it and was greeted with a nice error message,
had to roll back to 3.3

Thanks,
Tim

On Tue, Jun 7, 2016 at 1:38 PM, Munagala Ramanath 
wrote:

> Also, last I looked, the sandbox was still 3.3.1
>
> Ram
>
> On Tue, Jun 7, 2016 at 1:29 PM, Timothy Farkas <
> timothytiborfar...@gmail.com
> > wrote:
>
> > Thanks Ram, will take a note.
> >
> > Tim
> >
> > On Tue, Jun 7, 2016 at 1:27 PM, Munagala Ramanath 
> > wrote:
> >
> > > The archetype command for each version is in the file
> > > *apex-app-archetype/README.md*
> > > file of the appropriate branch. Here is the script I currently use:
> > >
> > > *#!/bin/bash*
> > >
> > > *#v='3.4.0'*
> > > *v='3.5.0-SNAPSHOT'*
> > > *mvn archetype:generate -B \*
> > > *  -DarchetypeGroupId=org.apache.apex \*
> > > *  -DarchetypeArtifactId=apex-app-archetype \*
> > > *  -DarchetypeVersion="$v" \*
> > > *  -DgroupId=com.example \*
> > > *  -Dpackage=com.example.myapexapp \*
> > > *  -DartifactId=myapexapp \*
> > > *  -Dversion=1.0-SNAPSHOT*
> > >
> > > Ram
> > >
> > > On Tue, Jun 7, 2016 at 1:20 PM, Timothy Farkas <
> > > timothytiborfar...@gmail.com
> > > > wrote:
> > >
> > > > Ah Thanks Ram. I did a quick google for the archetype, and this came
> up
> > > >
> > > > https://github.com/apache/apex-core/tree/master/apex-conf-archetype
> > > >
> > > > I then blindly copied the command, which is for config packages.
> Silly
> > me
> > > > :). Do you have a link to the correct apa archetype command for
> future
> > > > reference?
> > > >
> > > > Thanks,
> > > > Tim
> > > >
> > > > On Tue, Jun 7, 2016 at 1:15 PM, Munagala Ramanath <
> r...@datatorrent.com
> > >
> > > > wrote:
> > > >
> > > > > Tim,
> > > > >
> > > > > Are you building a config package by any chance, since that builds
> an
> > > > *apc*
> > > > > file.
> > > > > Can you post the exact archetype command you ran ?
> > > > >
> > > > > I just ran the archetype command for 3.4.0 and 3.5.0-SNAPSHOT and
> it
> > > > built
> > > > > an *apa* just as it always does.
> > > > >
> > > > > Ram
> > > > >
> > > > > On Tue, Jun 7, 2016 at 12:44 PM, Timothy Farkas <
> > > > > timothytiborfar...@gmail.com> wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > I noticed that the new project template generates a apc, doesn't
> > seem
> > > > to
> > > > > > launch in the sandbox I just downloaded. Can I get some hints
> about
> > > > what
> > > > > > version works with what, and what the differences between an apc
> > and
> > > > the
> > > > > > old apa are?
> > > > > >
> > > > > > Thanks!
> > > > > > Tim
> > > > > >
> > > > >
> > > >
> > >
> >
>


[jira] [Commented] (APEXMALHAR-2106) Support merging multiple streams with StreamMerger

2016-06-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319332#comment-15319332
 ] 

ASF GitHub Bot commented on APEXMALHAR-2106:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/309#discussion_r66146875
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java ---
@@ -59,6 +61,7 @@ public void process(K tuple)
   /**
* Data input port 2.
*/
+  @InputPortFieldAnnotation(optional = true)
--- End diff --

Same here


> Support merging multiple streams with StreamMerger 
> ---
>
> Key: APEXMALHAR-2106
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2106
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Ilya Ganelin
>Assignee: Ilya Ganelin
>
> To properly implement the Flatten transformation (and other Stream 
> combination operations), Apex must support merging data from multiple 
> sources. The StreamMerger operator can be improved to merge multiple streams, 
> rather than just the two streams it can handle in the present implementation. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2106) Support merging multiple streams with StreamMerger

2016-06-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319330#comment-15319330
 ] 

ASF GitHub Bot commented on APEXMALHAR-2106:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/309#discussion_r66146828
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java ---
@@ -44,6 +45,7 @@
   /**
* Data input port 1.
*/
+  @InputPortFieldAnnotation(optional = true)
--- End diff --

This won't be needed if we don't use mergers with just one input. See 
comment above on number of mergers.


> Support merging multiple streams with StreamMerger 
> ---
>
> Key: APEXMALHAR-2106
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2106
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Ilya Ganelin
>Assignee: Ilya Ganelin
>
> To properly implement the Flatten transformation (and other Stream 
> combination operations), Apex must support merging data from multiple 
> sources. The StreamMerger operator can be improved to merge multiple streams, 
> rather than just the two streams it can handle in the present implementation. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (APEXMALHAR-2085) Implement Windowed Operators

2016-06-07 Thread David Yan (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Yan reassigned APEXMALHAR-2085:
-

Assignee: David Yan  (was: Siyuan Hua)

> Implement Windowed Operators
> 
>
> Key: APEXMALHAR-2085
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2085
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Siyuan Hua
>Assignee: David Yan
>
> As per our recent several discussions in the community. A group of Windowed 
> Operators that delivers the window semantic follows the google Data Flow 
> model(https://cloud.google.com/dataflow/) is very important. 
> The operators should be designed and implemented in a way for 
> High-level API
> Beam translation
> Easy to use with other popular operator
> {panel:title=Operator Hierarchy}
> Hierarchy of the operators,
> The windowed operators should cover all possible transformations that require 
> window, and batch processing is also considered as special window called 
> global window
> {code}
>+---+
>+-> |  WindowedOperator | <+
>|   ++--+  |
>|^  ^+
>|| | |
>|| | |
> +--+++--+--+  +---+-++--+-+
> |CombineOperator||GroupOperator|  |KeyedOperator||JoinOperator|
> +---++-+  +--+--++-+--+
>+-^   ^ ^
>| | |
>   ++---+   +-++   +++
>   |KeyedCombine|   |KeyedGroup|   | CoGroup |
>   ++   +--+   +-+
> {code}
> Combine operation includes all operations that combine all tuples in one 
> window into one or small number of tuples, Group operation group all tuples 
> in one window, Join and CoGroup are used to join and group tuples from 
> different inputs.
> {panel}
> {panel:title=Components}
> * Window Component
> It includes configuration, window state that should be checkpointed, etc. It 
> should support NonMergibleWindow(fixed or slide) MergibleWindow(Session)
> * Trigger
> It should support early trigger, late trigger with customizable trigger 
> behaviour 
> * Other related components:
> ** Watermark generator, can be plugged into input source to generate watermark
> ** Tuple schema support:
> It should handle either predefined tuple type or give a declarative API to 
> describe the user defined tuple class
> {panel}
> Most component API should be reused in High-Level API
> This is the umbrella ticket, separate tickets would be created for different 
> components and operators respectively 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2106) Support merging multiple streams with StreamMerger

2016-06-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319288#comment-15319288
 ] 

ASF GitHub Bot commented on APEXMALHAR-2106:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/309#discussion_r66144530
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/stream/MultipleStreamMerger.java ---
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.stream;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Module;
+
+/**
+ * Module that adds functionality to bypass the platform limitations of 
combining more than two streams at a time with
+ * Stream Merger.
+ *
+ * Usage:
+ *
+ * dag.addOperator("Stream_1", op1);
+ * dag.addOperator("Stream_2", op2);
+ * dag.addOperator("Stream_3", op3);
+ *
+ * MultipleStreamMerger merger = new MultipleStreamMerger();
+ * merger.merge(op1.out)
+ * .merge(op2.out)
+ * .merge(op3.out)
+ * .insertInto(dag, conf);
+ *
+ * dag.addModule("Merger", merger);
+ *
+ * @param 
+ */
+public class MultipleStreamMerger implements Module
+{
+  public class Stream
+  {
+DefaultInputPort destPort;
+DefaultOutputPort sourcePort;
+String name;
+
+public Stream(String name, DefaultOutputPort sourcePort, 
DefaultInputPort destPort)
+{
+  this.destPort = destPort;
+  this.sourcePort = sourcePort;
+  this.name = name;
+}
+  }
+
+  public class NamedMerger
+  {
+StreamMerger merger;
+String name;
+
+public NamedMerger(String name, StreamMerger merger)
+{
+  this.merger = merger;
+  this.name = name;
+}
+  }
+
+  private int streamCount = 0;
+
+  ArrayList streamsToMerge = new ArrayList<>();
+
+  public transient ProxyOutputPort streamOutput = new 
ProxyOutputPort<>();
+
+  /**
+   * Used to define all the sources to be merged into a single stream.
+   *
+   * @param sourcePort - The output port from the upstream operator that 
provides data
+   * @return The updated MultipleStreamMerger object that tracks which 
streams should be unified.
+   */
+  public MultipleStreamMerger merge(DefaultOutputPort sourcePort)
+  {
+streamsToMerge.add(sourcePort);
+return this;
+  }
+
+  /**
+   * To merge more than two streams at a time, we construct a binary tree 
of thread-local StreamMerger operators
+   * E.g.
+   *
+   * Tier 0  Tier 1  Tier 2
+   *
+   * Stream 1 ->
+   * StreamMerger_1 ->
+   * Stream 2 ->
+   * StreamMerger_Final -> Out
+   * Stream 3 ->
+   * StreamMerger_2 ->
+   * Stream 4 ->
+   *
+   * This function updates the provided DAG with the relevant streams.
+   */
+  public void mergeStreams(DAG dag, Configuration conf)
+  {
+if (streamsToMerge.size() < 2) {
+  throw new IllegalArgumentException("Not enough streams to merge, at 
least two streams must be selected for " +
+  "merging with `.merge()`.");
+}
+
+ArrayList streamsToAddToDag = new ArrayList<>();
+ArrayList operatorsToAdd = new ArrayList<>();
+
+// Determine operators and streams to add to the DAG
+constructMergeTree(streamsToAddToDag, operatorsToAdd);
+
+for (NamedMerger m : operatorsToAdd) {
+  dag.addOperator(m.name, m.merger);
+}
+
+for (Stream s : streamsToAddToDag) {
+  dag.addStream(s.name, s.sourcePort, 

[GitHub] apex-malhar pull request #309: [APEXMALHAR-2106][WIP] Support multiple strea...

2016-06-07 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/309#discussion_r66144530
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/stream/MultipleStreamMerger.java ---
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.stream;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Module;
+
+/**
+ * Module that adds functionality to bypass the platform limitations of 
combining more than two streams at a time with
+ * Stream Merger.
+ *
+ * Usage:
+ *
+ * dag.addOperator("Stream_1", op1);
+ * dag.addOperator("Stream_2", op2);
+ * dag.addOperator("Stream_3", op3);
+ *
+ * MultipleStreamMerger merger = new MultipleStreamMerger();
+ * merger.merge(op1.out)
+ * .merge(op2.out)
+ * .merge(op3.out)
+ * .insertInto(dag, conf);
+ *
+ * dag.addModule("Merger", merger);
+ *
+ * @param 
+ */
+public class MultipleStreamMerger implements Module
+{
+  public class Stream
+  {
+DefaultInputPort destPort;
+DefaultOutputPort sourcePort;
+String name;
+
+public Stream(String name, DefaultOutputPort sourcePort, 
DefaultInputPort destPort)
+{
+  this.destPort = destPort;
+  this.sourcePort = sourcePort;
+  this.name = name;
+}
+  }
+
+  public class NamedMerger
+  {
+StreamMerger merger;
+String name;
+
+public NamedMerger(String name, StreamMerger merger)
+{
+  this.merger = merger;
+  this.name = name;
+}
+  }
+
+  private int streamCount = 0;
+
+  ArrayList streamsToMerge = new ArrayList<>();
+
+  public transient ProxyOutputPort streamOutput = new 
ProxyOutputPort<>();
+
+  /**
+   * Used to define all the sources to be merged into a single stream.
+   *
+   * @param sourcePort - The output port from the upstream operator that 
provides data
+   * @return The updated MultipleStreamMerger object that tracks which 
streams should be unified.
+   */
+  public MultipleStreamMerger merge(DefaultOutputPort sourcePort)
+  {
+streamsToMerge.add(sourcePort);
+return this;
+  }
+
+  /**
+   * To merge more than two streams at a time, we construct a binary tree 
of thread-local StreamMerger operators
+   * E.g.
+   *
+   * Tier 0  Tier 1  Tier 2
+   *
+   * Stream 1 ->
+   * StreamMerger_1 ->
+   * Stream 2 ->
+   * StreamMerger_Final -> Out
+   * Stream 3 ->
+   * StreamMerger_2 ->
+   * Stream 4 ->
+   *
+   * This function updates the provided DAG with the relevant streams.
+   */
+  public void mergeStreams(DAG dag, Configuration conf)
+  {
+if (streamsToMerge.size() < 2) {
+  throw new IllegalArgumentException("Not enough streams to merge, at 
least two streams must be selected for " +
+  "merging with `.merge()`.");
+}
+
+ArrayList streamsToAddToDag = new ArrayList<>();
+ArrayList operatorsToAdd = new ArrayList<>();
+
+// Determine operators and streams to add to the DAG
+constructMergeTree(streamsToAddToDag, operatorsToAdd);
+
+for (NamedMerger m : operatorsToAdd) {
+  dag.addOperator(m.name, m.merger);
+}
+
+for (Stream s : streamsToAddToDag) {
+  dag.addStream(s.name, s.sourcePort, 
s.destPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
+}
+  }
+
+  /**
+   * Given a set of streams to be merged (defined via {@link 
#merge(DefaultOutputPort)}), compute the optimal
+   * structure of cascading mergers that need to be instantiated, added 

Re: APC

2016-06-07 Thread Munagala Ramanath
Tim,

Are you building a config package by any chance, since that builds an *apc*
file.
Can you post the exact archetype command you ran ?

I just ran the archetype command for 3.4.0 and 3.5.0-SNAPSHOT and it built
an *apa* just as it always does.

Ram

On Tue, Jun 7, 2016 at 12:44 PM, Timothy Farkas <
timothytiborfar...@gmail.com> wrote:

> Hi All,
>
> I noticed that the new project template generates a apc, doesn't seem to
> launch in the sandbox I just downloaded. Can I get some hints about what
> version works with what, and what the differences between an apc and the
> old apa are?
>
> Thanks!
> Tim
>


Re: Proposal : DAG - SetOperatorAttribute

2016-06-07 Thread Sandesh Hegde
I have created a Jira, will start working on this.

https://issues.apache.org/jira/browse/APEXCORE-470



On Tue, Jun 7, 2016 at 12:21 PM Munagala Ramanath 
wrote:

> +1
>
> Since we have *setInputPortAttribute* and *setOutputPortAttribute*, it
> seems reasonable
> to add *setOperatorAttribute*.
>
> Ram
>
> On Mon, Jun 6, 2016 at 1:39 PM, Sandesh Hegde 
> wrote:
>
> > Currently, *setAttribute* is used to set the operator attributes. Other 2
> > Attribute setting APIs are specific to input ports
> > (*setInputPortAttributes*) and output ports (*setOutputPortsAttributes*).
> >
> > Proposal is to have *SetOperatorAttribute*
> > api, which will clearly indicate that user wants set attributes on the
> > operator.
> > ( setOperatorAttribute(Operator operator, Attribute key, T value) )
> >
> > Following will be the roles for the APIs
> > *setAttributes* --> for setting Attributes for the whole DAG (
> > setAttribute(Operator operator, Attribute key, T value) - can be
> > deprecated )
> > *setOperatorAttributes* --> for setting Attributes for the operator
> >
> > Let me know your thoughts.
> >
> > Thanks
> >
>


[jira] [Commented] (APEXMALHAR-2106) Support merging multiple streams with StreamMerger

2016-06-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319260#comment-15319260
 ] 

ASF GitHub Bot commented on APEXMALHAR-2106:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/309#discussion_r66142732
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/stream/MultipleStreamMerger.java ---
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.stream;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Module;
+
+/**
+ * Module that adds functionality to bypass the platform limitations of 
combining more than two streams at a time with
+ * Stream Merger.
+ *
+ * Usage:
+ *
+ * dag.addOperator("Stream_1", op1);
+ * dag.addOperator("Stream_2", op2);
+ * dag.addOperator("Stream_3", op3);
+ *
+ * MultipleStreamMerger merger = new MultipleStreamMerger();
+ * merger.merge(op1.out)
+ * .merge(op2.out)
+ * .merge(op3.out)
+ * .insertInto(dag, conf);
+ *
+ * dag.addModule("Merger", merger);
+ *
+ * @param 
+ */
+public class MultipleStreamMerger implements Module
+{
+  public class Stream
+  {
+DefaultInputPort destPort;
+DefaultOutputPort sourcePort;
+String name;
+
+public Stream(String name, DefaultOutputPort sourcePort, 
DefaultInputPort destPort)
+{
+  this.destPort = destPort;
+  this.sourcePort = sourcePort;
+  this.name = name;
+}
+  }
+
+  public class NamedMerger
+  {
+StreamMerger merger;
+String name;
+
+public NamedMerger(String name, StreamMerger merger)
+{
+  this.merger = merger;
+  this.name = name;
+}
+  }
+
+  private int streamCount = 0;
+
+  ArrayList streamsToMerge = new ArrayList<>();
+
+  public transient ProxyOutputPort streamOutput = new 
ProxyOutputPort<>();
+
+  /**
+   * Used to define all the sources to be merged into a single stream.
+   *
+   * @param sourcePort - The output port from the upstream operator that 
provides data
+   * @return The updated MultipleStreamMerger object that tracks which 
streams should be unified.
+   */
+  public MultipleStreamMerger merge(DefaultOutputPort sourcePort)
+  {
+streamsToMerge.add(sourcePort);
+return this;
+  }
+
+  /**
+   * To merge more than two streams at a time, we construct a binary tree 
of thread-local StreamMerger operators
+   * E.g.
+   *
+   * Tier 0  Tier 1  Tier 2
+   *
+   * Stream 1 ->
+   * StreamMerger_1 ->
+   * Stream 2 ->
+   * StreamMerger_Final -> Out
+   * Stream 3 ->
+   * StreamMerger_2 ->
+   * Stream 4 ->
+   *
+   * This function updates the provided DAG with the relevant streams.
+   */
+  public void mergeStreams(DAG dag, Configuration conf)
+  {
+if (streamsToMerge.size() < 2) {
+  throw new IllegalArgumentException("Not enough streams to merge, at 
least two streams must be selected for " +
+  "merging with `.merge()`.");
+}
+
+ArrayList streamsToAddToDag = new ArrayList<>();
+ArrayList operatorsToAdd = new ArrayList<>();
+
+// Determine operators and streams to add to the DAG
+constructMergeTree(streamsToAddToDag, operatorsToAdd);
+
+for (NamedMerger m : operatorsToAdd) {
+  dag.addOperator(m.name, m.merger);
+}
+
+for (Stream s : streamsToAddToDag) {
+  dag.addStream(s.name, s.sourcePort, 

[GitHub] apex-malhar pull request #309: [APEXMALHAR-2106][WIP] Support multiple strea...

2016-06-07 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/309#discussion_r66142732
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/stream/MultipleStreamMerger.java ---
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.stream;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Module;
+
+/**
+ * Module that adds functionality to bypass the platform limitations of 
combining more than two streams at a time with
+ * Stream Merger.
+ *
+ * Usage:
+ *
+ * dag.addOperator("Stream_1", op1);
+ * dag.addOperator("Stream_2", op2);
+ * dag.addOperator("Stream_3", op3);
+ *
+ * MultipleStreamMerger merger = new MultipleStreamMerger();
+ * merger.merge(op1.out)
+ * .merge(op2.out)
+ * .merge(op3.out)
+ * .insertInto(dag, conf);
+ *
+ * dag.addModule("Merger", merger);
+ *
+ * @param 
+ */
+public class MultipleStreamMerger implements Module
+{
+  public class Stream
+  {
+DefaultInputPort destPort;
+DefaultOutputPort sourcePort;
+String name;
+
+public Stream(String name, DefaultOutputPort sourcePort, 
DefaultInputPort destPort)
+{
+  this.destPort = destPort;
+  this.sourcePort = sourcePort;
+  this.name = name;
+}
+  }
+
+  public class NamedMerger
+  {
+StreamMerger merger;
+String name;
+
+public NamedMerger(String name, StreamMerger merger)
+{
+  this.merger = merger;
+  this.name = name;
+}
+  }
+
+  private int streamCount = 0;
+
+  ArrayList streamsToMerge = new ArrayList<>();
+
+  public transient ProxyOutputPort streamOutput = new 
ProxyOutputPort<>();
+
+  /**
+   * Used to define all the sources to be merged into a single stream.
+   *
+   * @param sourcePort - The output port from the upstream operator that 
provides data
+   * @return The updated MultipleStreamMerger object that tracks which 
streams should be unified.
+   */
+  public MultipleStreamMerger merge(DefaultOutputPort sourcePort)
+  {
+streamsToMerge.add(sourcePort);
+return this;
+  }
+
+  /**
+   * To merge more than two streams at a time, we construct a binary tree 
of thread-local StreamMerger operators
+   * E.g.
+   *
+   * Tier 0  Tier 1  Tier 2
+   *
+   * Stream 1 ->
+   * StreamMerger_1 ->
+   * Stream 2 ->
+   * StreamMerger_Final -> Out
+   * Stream 3 ->
+   * StreamMerger_2 ->
+   * Stream 4 ->
+   *
+   * This function updates the provided DAG with the relevant streams.
+   */
+  public void mergeStreams(DAG dag, Configuration conf)
+  {
+if (streamsToMerge.size() < 2) {
+  throw new IllegalArgumentException("Not enough streams to merge, at 
least two streams must be selected for " +
+  "merging with `.merge()`.");
+}
+
+ArrayList streamsToAddToDag = new ArrayList<>();
+ArrayList operatorsToAdd = new ArrayList<>();
+
+// Determine operators and streams to add to the DAG
+constructMergeTree(streamsToAddToDag, operatorsToAdd);
+
+for (NamedMerger m : operatorsToAdd) {
+  dag.addOperator(m.name, m.merger);
+}
+
+for (Stream s : streamsToAddToDag) {
+  dag.addStream(s.name, s.sourcePort, 
s.destPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
+}
+  }
+
+  /**
+   * Given a set of streams to be merged (defined via {@link 
#merge(DefaultOutputPort)}), compute the optimal
+   * structure of cascading mergers that need to be instantiated, added 

[jira] [Created] (APEXCORE-470) New Api for setting the attribute on the operator ( setOperatorAttribute )

2016-06-07 Thread Sandesh (JIRA)
Sandesh created APEXCORE-470:


 Summary: New Api for setting the attribute on the operator ( 
setOperatorAttribute )
 Key: APEXCORE-470
 URL: https://issues.apache.org/jira/browse/APEXCORE-470
 Project: Apache Apex Core
  Issue Type: Improvement
Reporter: Sandesh
Assignee: Sandesh


Currently, *setAttribute* is used to set the operator attributes. Other 2 
Attribute setting APIs are specific to input ports (*setInputPortAttributes*) 
and output ports (*setOutputPortsAttributes*).

Proposal is to have *SetOperatorAttribute* api, which will clearly indicate 
that user wants set attributes on the operator.
( setOperatorAttribute(Operator operator, Attribute key, T value) )

Following will be the roles for the APIs
*setAttributes* --> for setting Attributes for the whole DAG (  
setAttribute(Operator operator, Attribute key, T value) - can be deprecated )
*setOperatorAttributes* --> for setting Attributes for the operator 


All the unit test cases using the previous API will be renamed as a part of 
this change.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


APC

2016-06-07 Thread Timothy Farkas
Hi All,

I noticed that the new project template generates a apc, doesn't seem to
launch in the sandbox I just downloaded. Can I get some hints about what
version works with what, and what the differences between an apc and the
old apa are?

Thanks!
Tim


Re: Proposal : DAG - SetOperatorAttribute

2016-06-07 Thread Munagala Ramanath
+1

Since we have *setInputPortAttribute* and *setOutputPortAttribute*, it
seems reasonable
to add *setOperatorAttribute*.

Ram

On Mon, Jun 6, 2016 at 1:39 PM, Sandesh Hegde 
wrote:

> Currently, *setAttribute* is used to set the operator attributes. Other 2
> Attribute setting APIs are specific to input ports
> (*setInputPortAttributes*) and output ports (*setOutputPortsAttributes*).
>
> Proposal is to have *SetOperatorAttribute*
> api, which will clearly indicate that user wants set attributes on the
> operator.
> ( setOperatorAttribute(Operator operator, Attribute key, T value) )
>
> Following will be the roles for the APIs
> *setAttributes* --> for setting Attributes for the whole DAG (
> setAttribute(Operator operator, Attribute key, T value) - can be
> deprecated )
> *setOperatorAttributes* --> for setting Attributes for the operator
>
> Let me know your thoughts.
>
> Thanks
>


[GitHub] apex-malhar pull request #313: Apexmalhar 2113 jdbc pojo output operator

2016-06-07 Thread devtagare
Github user devtagare closed the pull request at:

https://github.com/apache/apex-malhar/pull/313


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2086) Kafka Output Operator with Kafka 0.9 API

2016-06-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319133#comment-15319133
 ] 

ASF GitHub Bot commented on APEXMALHAR-2086:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/298#discussion_r66132271
  
--- Diff: 
kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java
 ---
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.util.Properties;
+import javax.validation.constraints.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+
+public abstract class AbstractKafkaOutputOperator implements Operator
+{
+  @SuppressWarnings("unused")
+  private static final Logger logger = 
LoggerFactory.getLogger(AbstractKafkaOutputOperator.class);
--- End diff --

Can you remove the logger is unused?


> Kafka Output Operator with Kafka 0.9 API
> 
>
> Key: APEXMALHAR-2086
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2086
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Sandesh
>Assignee: Sandesh
>
> Goal : 2 Operartors for Kafka Output
>   1. Simple Kafka Output Operator 
> - Supports Atleast Once 
> - Expose most used producer properties as class properties
>   2. Exactly Once Kafka Output ( Not possible in all the cases, will be 
> documented later )
> 
> Design for Exactly Once
> Window Data Manager - Stores the Kafka partitions offsets.
> Kafka Key - Used by the operator = AppID#OperatorId
> During recovery. Partially written window is re-created using the following  
> approach:
> Tuples between the largest recovery offsets and the current offset are 
> checked. Based on the key, tuples written by the other entities are 
> discarded. 
> Only tuples which are not in the recovered set are emitted.
> Tuples needs to be unique within the window.
>   



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #298: *For review only* [APEXMALHAR-2086] Kafka out...

2016-06-07 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/298#discussion_r66131514
  
--- Diff: 
kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java
 ---
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.util.Properties;
+import javax.validation.constraints.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+
+public abstract class AbstractKafkaOutputOperator implements Operator
+{
+  @SuppressWarnings("unused")
+  private static final Logger logger = 
LoggerFactory.getLogger(AbstractKafkaOutputOperator.class);
+  private transient Producer producer;  // K is key partitioner, V 
is value type
+  @NotNull
+  private String topic = "topic1";
+  private String producerProperties = "";
+  private Properties configProperties = new Properties();
+  public Properties getConfigProperties()
+  {
+return configProperties;
+  }
+
+  public void setConfigProperties(Properties configProperties)
+  {
+this.configProperties.putAll(configProperties);
+  }
+
+  /**
+   * setup producer configuration.
+   * @return ProducerConfig
+   */
+  protected Properties createKafkaProducerConfig()
+  {
+Properties prop = new Properties();
+for (String propString : producerProperties.split(",")) {
+  if (!propString.contains("=")) {
+continue;
+  }
+  String[] keyVal = StringUtils.trim(propString).split("=");
+  prop.put(StringUtils.trim(keyVal[0]), StringUtils.trim(keyVal[1]));
+}
+
+configProperties.putAll(prop);
+
+return configProperties;
+  }
+
+  public Producer getProducer()
--- End diff --

Any reason why this is made public? Its being accessed from subclass.. So 
can be made protected.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2086) Kafka Output Operator with Kafka 0.9 API

2016-06-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319122#comment-15319122
 ] 

ASF GitHub Bot commented on APEXMALHAR-2086:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/298#discussion_r66131514
  
--- Diff: 
kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java
 ---
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.util.Properties;
+import javax.validation.constraints.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+
+public abstract class AbstractKafkaOutputOperator implements Operator
+{
+  @SuppressWarnings("unused")
+  private static final Logger logger = 
LoggerFactory.getLogger(AbstractKafkaOutputOperator.class);
+  private transient Producer producer;  // K is key partitioner, V 
is value type
+  @NotNull
+  private String topic = "topic1";
+  private String producerProperties = "";
+  private Properties configProperties = new Properties();
+  public Properties getConfigProperties()
+  {
+return configProperties;
+  }
+
+  public void setConfigProperties(Properties configProperties)
+  {
+this.configProperties.putAll(configProperties);
+  }
+
+  /**
+   * setup producer configuration.
+   * @return ProducerConfig
+   */
+  protected Properties createKafkaProducerConfig()
+  {
+Properties prop = new Properties();
+for (String propString : producerProperties.split(",")) {
+  if (!propString.contains("=")) {
+continue;
+  }
+  String[] keyVal = StringUtils.trim(propString).split("=");
+  prop.put(StringUtils.trim(keyVal[0]), StringUtils.trim(keyVal[1]));
+}
+
+configProperties.putAll(prop);
+
+return configProperties;
+  }
+
+  public Producer getProducer()
--- End diff --

Any reason why this is made public? Its being accessed from subclass.. So 
can be made protected.


> Kafka Output Operator with Kafka 0.9 API
> 
>
> Key: APEXMALHAR-2086
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2086
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Sandesh
>Assignee: Sandesh
>
> Goal : 2 Operartors for Kafka Output
>   1. Simple Kafka Output Operator 
> - Supports Atleast Once 
> - Expose most used producer properties as class properties
>   2. Exactly Once Kafka Output ( Not possible in all the cases, will be 
> documented later )
> 
> Design for Exactly Once
> Window Data Manager - Stores the Kafka partitions offsets.
> Kafka Key - Used by the operator = AppID#OperatorId
> During recovery. Partially written window is re-created using the following  
> approach:
> Tuples between the largest recovery offsets and the current offset are 
> checked. Based on the key, tuples written by the other entities are 
> discarded. 
> Only tuples which are not in the recovered set are emitted.
> Tuples needs to be unique within the window.
>   



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2086) Kafka Output Operator with Kafka 0.9 API

2016-06-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319118#comment-15319118
 ] 

ASF GitHub Bot commented on APEXMALHAR-2086:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/298#discussion_r66131334
  
--- Diff: 
kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java
 ---
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.util.Properties;
+import javax.validation.constraints.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+
+public abstract class AbstractKafkaOutputOperator implements Operator
+{
+  @SuppressWarnings("unused")
+  private static final Logger logger = 
LoggerFactory.getLogger(AbstractKafkaOutputOperator.class);
+  private transient Producer producer;  // K is key partitioner, V 
is value type
+  @NotNull
+  private String topic = "topic1";
+  private String producerProperties = "";
+  private Properties configProperties = new Properties();
+  public Properties getConfigProperties()
--- End diff --

And also add javadoc for each of those.


> Kafka Output Operator with Kafka 0.9 API
> 
>
> Key: APEXMALHAR-2086
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2086
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Sandesh
>Assignee: Sandesh
>
> Goal : 2 Operartors for Kafka Output
>   1. Simple Kafka Output Operator 
> - Supports Atleast Once 
> - Expose most used producer properties as class properties
>   2. Exactly Once Kafka Output ( Not possible in all the cases, will be 
> documented later )
> 
> Design for Exactly Once
> Window Data Manager - Stores the Kafka partitions offsets.
> Kafka Key - Used by the operator = AppID#OperatorId
> During recovery. Partially written window is re-created using the following  
> approach:
> Tuples between the largest recovery offsets and the current offset are 
> checked. Based on the key, tuples written by the other entities are 
> discarded. 
> Only tuples which are not in the recovered set are emitted.
> Tuples needs to be unique within the window.
>   



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2086) Kafka Output Operator with Kafka 0.9 API

2016-06-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319117#comment-15319117
 ] 

ASF GitHub Bot commented on APEXMALHAR-2086:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/298#discussion_r66131266
  
--- Diff: 
kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java
 ---
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.util.Properties;
+import javax.validation.constraints.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+
+public abstract class AbstractKafkaOutputOperator implements Operator
+{
+  @SuppressWarnings("unused")
+  private static final Logger logger = 
LoggerFactory.getLogger(AbstractKafkaOutputOperator.class);
+  private transient Producer producer;  // K is key partitioner, V 
is value type
+  @NotNull
+  private String topic = "topic1";
+  private String producerProperties = "";
+  private Properties configProperties = new Properties();
+  public Properties getConfigProperties()
--- End diff --

We don't have a constraint like this, but would be good if you could 
organize the property access method at one place.


> Kafka Output Operator with Kafka 0.9 API
> 
>
> Key: APEXMALHAR-2086
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2086
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Sandesh
>Assignee: Sandesh
>
> Goal : 2 Operartors for Kafka Output
>   1. Simple Kafka Output Operator 
> - Supports Atleast Once 
> - Expose most used producer properties as class properties
>   2. Exactly Once Kafka Output ( Not possible in all the cases, will be 
> documented later )
> 
> Design for Exactly Once
> Window Data Manager - Stores the Kafka partitions offsets.
> Kafka Key - Used by the operator = AppID#OperatorId
> During recovery. Partially written window is re-created using the following  
> approach:
> Tuples between the largest recovery offsets and the current offset are 
> checked. Based on the key, tuples written by the other entities are 
> discarded. 
> Only tuples which are not in the recovered set are emitted.
> Tuples needs to be unique within the window.
>   



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #298: *For review only* [APEXMALHAR-2086] Kafka out...

2016-06-07 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/298#discussion_r66131266
  
--- Diff: 
kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java
 ---
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.util.Properties;
+import javax.validation.constraints.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+
+public abstract class AbstractKafkaOutputOperator implements Operator
+{
+  @SuppressWarnings("unused")
+  private static final Logger logger = 
LoggerFactory.getLogger(AbstractKafkaOutputOperator.class);
+  private transient Producer producer;  // K is key partitioner, V 
is value type
+  @NotNull
+  private String topic = "topic1";
+  private String producerProperties = "";
+  private Properties configProperties = new Properties();
+  public Properties getConfigProperties()
--- End diff --

We don't have a constraint like this, but would be good if you could 
organize the property access method at one place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2086) Kafka Output Operator with Kafka 0.9 API

2016-06-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319114#comment-15319114
 ] 

ASF GitHub Bot commented on APEXMALHAR-2086:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/298#discussion_r66130894
  
--- Diff: 
kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java
 ---
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.util.Properties;
+import javax.validation.constraints.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+
+public abstract class AbstractKafkaOutputOperator implements Operator
+{
+  @SuppressWarnings("unused")
+  private static final Logger logger = 
LoggerFactory.getLogger(AbstractKafkaOutputOperator.class);
+  private transient Producer producer;  // K is key partitioner, V 
is value type
+  @NotNull
+  private String topic = "topic1";
--- End diff --

Why is default value of "topic" property set to "topic1"... That 
invalidates the use of "@NotNull"...


> Kafka Output Operator with Kafka 0.9 API
> 
>
> Key: APEXMALHAR-2086
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2086
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Sandesh
>Assignee: Sandesh
>
> Goal : 2 Operartors for Kafka Output
>   1. Simple Kafka Output Operator 
> - Supports Atleast Once 
> - Expose most used producer properties as class properties
>   2. Exactly Once Kafka Output ( Not possible in all the cases, will be 
> documented later )
> 
> Design for Exactly Once
> Window Data Manager - Stores the Kafka partitions offsets.
> Kafka Key - Used by the operator = AppID#OperatorId
> During recovery. Partially written window is re-created using the following  
> approach:
> Tuples between the largest recovery offsets and the current offset are 
> checked. Based on the key, tuples written by the other entities are 
> discarded. 
> Only tuples which are not in the recovered set are emitted.
> Tuples needs to be unique within the window.
>   



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #298: *For review only* [APEXMALHAR-2086] Kafka out...

2016-06-07 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/298#discussion_r66130894
  
--- Diff: 
kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java
 ---
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.util.Properties;
+import javax.validation.constraints.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+
+public abstract class AbstractKafkaOutputOperator implements Operator
+{
+  @SuppressWarnings("unused")
+  private static final Logger logger = 
LoggerFactory.getLogger(AbstractKafkaOutputOperator.class);
+  private transient Producer producer;  // K is key partitioner, V 
is value type
+  @NotNull
+  private String topic = "topic1";
--- End diff --

Why is default value of "topic" property set to "topic1"... That 
invalidates the use of "@NotNull"...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2086) Kafka Output Operator with Kafka 0.9 API

2016-06-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319035#comment-15319035
 ] 

ASF GitHub Bot commented on APEXMALHAR-2086:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/298#discussion_r66122516
  
--- Diff: 
kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java
 ---
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import com.datatorrent.api.DefaultInputPort;
+
+public class KafkaSinglePortOutputOperator extends 
AbstractKafkaOutputOperator
--- End diff --

Can you make this evolving?


> Kafka Output Operator with Kafka 0.9 API
> 
>
> Key: APEXMALHAR-2086
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2086
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Sandesh
>Assignee: Sandesh
>
> Goal : 2 Operartors for Kafka Output
>   1. Simple Kafka Output Operator 
> - Supports Atleast Once 
> - Expose most used producer properties as class properties
>   2. Exactly Once Kafka Output ( Not possible in all the cases, will be 
> documented later )
> 
> Design for Exactly Once
> Window Data Manager - Stores the Kafka partitions offsets.
> Kafka Key - Used by the operator = AppID#OperatorId
> During recovery. Partially written window is re-created using the following  
> approach:
> Tuples between the largest recovery offsets and the current offset are 
> checked. Based on the key, tuples written by the other entities are 
> discarded. 
> Only tuples which are not in the recovered set are emitted.
> Tuples needs to be unique within the window.
>   



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-site pull request #36: Adding announcements and meetups to Apex front p...

2016-06-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/apex-site/pull/36


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-site issue #36: Adding announcements and meetups to Apex front page

2016-06-07 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/apex-site/pull/36
  
@sashadt Already live :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-site issue #36: Adding announcements and meetups to Apex front page

2016-06-07 Thread sashadt
Github user sashadt commented on the issue:

https://github.com/apache/apex-site/pull/36
  
@tweise Do you want to wait until more content to be added, or should we 
merge this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Facing issue in merging PR

2016-06-07 Thread Shubham Pathak
Thank you Thomas.

For few committers the issue seems to have been resolved  but I am still
facing the issue while trying to commit.

shubham@shubham:/code/projects/Apache/commit/apex-malhar$ git push origin
master
Username for 'https://git-wip-us.apache.org': shubham
Password for 'https://shub...@git-wip-us.apache.org':
Counting objects: 115, done.
Delta compression using up to 8 threads.
Compressing objects: 100% (23/23), done.
Writing objects: 100% (33/33), 10.28 KiB | 0 bytes/s, done.
Total 33 (delta 5), reused 0 (delta 0)
remote: You are not authorized to edit this repository.
remote:
To https://git-wip-us.apache.org/repos/asf/apex-malhar.git
 ! [remote rejected] master -> master (pre-receive hook declined)
error: failed to push some refs to '
https://git-wip-us.apache.org/repos/asf/apex-malhar.git'

Am I missing any step here ?

Thanks,
Shubham

On Mon, Jun 6, 2016 at 9:33 PM, Thomas Weise  wrote:

> Looks like the committer group isn't setup yet. It is tracked here:
>
> https://issues.apache.org/jira/browse/INFRA-11719
>
>
>
> On Mon, Jun 6, 2016 at 4:36 AM, Shubham Pathak 
> wrote:
>
> > Hello,
> >
> > I am facing the same issue.
> >
> > shubham@shubham:/code/projects/Apache/commit/apex-malhar$ git remote -v
> > origin https://git-wip-us.apache.org/repos/asf/apex-malhar.git (fetch)
> > origin https://git-wip-us.apache.org/repos/asf/apex-malhar.git (push)
> > shubham@shubham:/code/projects/Apache/commit/apex-malhar$ git branch
> > * master
> > shubham@shubham:/code/projects/Apache/commit/apex-malhar$ git push
> origin
> > master
> > Username for 'https://git-wip-us.apache.org': shubham
> > Password for 'https://shub...@git-wip-us.apache.org':
> > Counting objects: 107, done.
> > Delta compression using up to 8 threads.
> > Compressing objects: 100% (15/15), done.
> > Writing objects: 100% (25/25), 9.51 KiB | 0 bytes/s, done.
> > Total 25 (delta 3), reused 0 (delta 0)
> > remote: You are not authorized to edit this repository.
> > remote:
> > To https://git-wip-us.apache.org/repos/asf/apex-malhar.git
> >  ! [remote rejected] master -> master (pre-receive hook declined)
> > error: failed to push some refs to '
> > https://git-wip-us.apache.org/repos/asf/apex-malhar.git'
> >
> > Thanks,
> > Shubham
> >
> > On Mon, Jun 6, 2016 at 5:04 PM, Yogi Devendra <
> > devendra.vyavah...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > I am merging PR for the first time. Getting following error.
> > >
> > > remote: You are not authorized to edit this repository.
> > >
> > >
> > > Given below is the trace from console. Is it permission issue for my
> > > account or permission issue with the new repo links?
> > >
> > > 
> > > 
> > >
> > > ✔ /repo/apex-malhar [master|✔]
> > > 16:01 $ git remote -v
> > > apache  https://git-wip-us.apache.org/repos/asf/apex-malhar.git/
> (fetch)
> > > apache  https://git-wip-us.apache.org/repos/asf/apex-malhar.git/
> (push)
> > > origin  g...@github.com:apache/apex-malhar.git (fetch)
> > > origin  g...@github.com:apache/apex-malhar.git (push)
> > > upstreamg...@github.com:apache/apex-malhar.git (fetch)
> > > upstreamg...@github.com:apache/apex-malhar.git (push)
> > > ✔ /repo/apex-malhar [master|✔]
> > > 16:01 $ git fetch apache
> > > ✔ /repo/apex-malhar [master|✔]
> > > 16:02 $ git reset --hard apache/master
> > > HEAD is now at 3e1cd8f APEXMALHAR-2112 Add logging exclusions in geode
> > > dependency for running of all other contrib tests with 1.7 Java.
> > > ✔ /repo/apex-malhar [master|✔]
> > > 16:03 $ git pull https://github.com/tweise/incubator-apex-malhar
> master
> > > From https://github.com/tweise/incubator-apex-malhar
> > >  * branchmaster -> FETCH_HEAD
> > > Updating 3e1cd8f..18588f3
> > > Fast-forward
> > >
> > >
> >
> library/src/main/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperator.java
> > > | 21 +++--
> > >  1 file changed, 11 insertions(+), 10 deletions(-)
> > > ✔ /repo/apex-malhar [master ↑·1|✔]
> > > 16:03 $ git push apache master
> > > Username for 'https://git-wip-us.apache.org': yogidevendra
> > > Password for 'https://yogideven...@git-wip-us.apache.org':
> > > Counting objects: 62, done.
> > > Delta compression using up to 8 threads.
> > > Compressing objects: 100% (10/10), done.
> > > Writing objects: 100% (13/13), 1.10 KiB | 0 bytes/s, done.
> > > Total 13 (delta 4), reused 0 (delta 0)
> > > remote: You are not authorized to edit this repository.
> > > remote:
> > > To https://git-wip-us.apache.org/repos/asf/apex-malhar.git/
> > >  ! [remote rejected] master -> master (pre-receive hook declined)
> > > error: failed to push some refs to
> > > 'https://git-wip-us.apache.org/repos/asf/apex-malhar.git/'
> > > ✘-1 /repo/apex-malhar [master ↑·1|✔]
> > > 16:04 $ git checkout -b master-try2
> > > Switched to a new branch 'master-try2'
> > > ✔