[jira] [Commented] (APEXCORE-339) Support ability to tag operators as idempotent or non-idempotent
[ https://issues.apache.org/jira/browse/APEXCORE-339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319830#comment-15319830 ] Thomas Weise commented on APEXCORE-339: --- [~PramodSSImmaneni] can you please provide more info WRT implementation approach here. > Support ability to tag operators as idempotent or non-idempotent > > > Key: APEXCORE-339 > URL: https://issues.apache.org/jira/browse/APEXCORE-339 > Project: Apache Apex Core > Issue Type: Bug >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > Certain application require idempotency and some others don't. Additionally > operators such as input operators need to be specially instrumented to > exhibit idempotent behavior. Certain output operators like database operators > rely on idempotency. For these reasons we need ability to label operators as > idempotent or now, whether output operators require idempotency from upstream > operators or not. In case of a failure the platform should handle > non-idempotent streams by restarting the failed operator and all downsteram > operators at the same checkpoint. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-core pull request #349: APEXCORE-470 New API in DAG - setOperatorAttrib...
Github user PramodSSImmaneni commented on a diff in the pull request: https://github.com/apache/apex-core/pull/349#discussion_r66172343 --- Diff: api/src/main/java/com/datatorrent/api/DAG.java --- @@ -238,9 +238,15 @@ /** * setAttribute. */ + @Deprecated public abstract void setAttribute(Operator operator, Attribute key, T value); /** + * setOperatorAttribute. --- End diff -- javadoc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXCORE-470) New Api for setting the attribute on the operator ( setOperatorAttribute )
[ https://issues.apache.org/jira/browse/APEXCORE-470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319709#comment-15319709 ] ASF GitHub Bot commented on APEXCORE-470: - Github user PramodSSImmaneni commented on a diff in the pull request: https://github.com/apache/apex-core/pull/349#discussion_r66172343 --- Diff: api/src/main/java/com/datatorrent/api/DAG.java --- @@ -238,9 +238,15 @@ /** * setAttribute. */ + @Deprecated public abstract void setAttribute(Operator operator, Attribute key, T value); /** + * setOperatorAttribute. --- End diff -- javadoc > New Api for setting the attribute on the operator ( setOperatorAttribute ) > -- > > Key: APEXCORE-470 > URL: https://issues.apache.org/jira/browse/APEXCORE-470 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Sandesh >Assignee: Sandesh > > Currently, *setAttribute* is used to set the operator attributes. Other 2 > Attribute setting APIs are specific to input ports (*setInputPortAttributes*) > and output ports (*setOutputPortsAttributes*). > Proposal is to have *SetOperatorAttribute* api, which will clearly indicate > that user wants set attributes on the operator. > ( setOperatorAttribute(Operator operator, Attribute key, T value) ) > Following will be the roles for the APIs > *setAttributes* --> for setting Attributes for the whole DAG ( > setAttribute(Operator operator, Attribute key, T value) - can be > deprecated ) > *setOperatorAttributes* --> for setting Attributes for the operator > All the unit test cases using the previous API will be renamed as a part of > this change. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-core pull request #349: APEXCORE-470
GitHub user sandeshh opened a pull request: https://github.com/apache/apex-core/pull/349 APEXCORE-470 1. Added the new API - setOperatorAttribute 2. updated the unit tests, 3. marked "setAttribute(Operator..." as deprecated @PramodSSImmaneni please review. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sandeshh/apex-core APEXCORE-470 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-core/pull/349.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #349 commit bcd3426ba65f5291925dedb1e4fdfef4c80096ed Author: sandeshhDate: 2016-06-07T23:27:26Z APEXCORE-470 - Added the new api setOperatorAttribute and updated the tests, also marked setAttribute(Operator... as deprecated --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #282: APEXMALHAR-2066 JdbcPolling,idempotent,partit...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/282#discussion_r66169324 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -0,0 +1,652 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import javax.validation.constraints.Min; + +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.commons.lang3.tuple.MutablePair; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.Operator.IdleTimeHandler; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.lib.db.AbstractStoreInputOperator; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Abstract operator for for consuming data using JDBC interface + * User needs User needs to provide + * tableName,dbConnection,setEmitColumnList,look-up key + * Optionally batchSize,pollInterval,Look-up key and a where clause can be given + * + * This operator uses static partitioning to arrive at range queries for exactly + * once reads + * Assumption is that there is an ordered column using which range queries can + * be formed + * If an emitColumnList is provided, please ensure that the keyColumn is the + * first column in the list + * Range queries are formed using the {@link JdbcMetaDataUtility}} Output - + * comma separated list of the emit columns eg columnA,columnB,columnC + * + * @displayName Jdbc Polling Input Operator + * @category Input + * @tags database, sql, jdbc, partitionable,exactlyOnce + */ +public abstract class AbstractJdbcPollInputOperator extends AbstractStoreInputOperator+implements ActivationListener, IdleTimeHandler, Partitioner +{ + /* + * poll interval in milliseconds + */ + private int pollInterval; + + @Min(1) + private int partitionCount = 1; + protected transient int operatorId; + protected transient boolean isReplayed; + protected transient boolean isPollable; + protected int batchSize; + protected int fetchSize; + /** + * Map of windowId to of the range key + */ + protected transient MutablePair currentWindowRecoveryState; + + /** + * size of the emit queue used to hold polled records before emit + */ + private int queueCapacity = 4 * 1024 * 1024; + private transient volatile boolean execute; + private transient AtomicReference cause; + protected transient int spinMillis; + private transient OperatorContext context; + protected String tableName; + protected String key; + protected long currentWindowId; + protected KeyValPair rangeQueryPair; + protected String lower; + protected String upper; + protected boolean recovered; + protected boolean isPolled; + protected String whereCondition = null; --- End diff -- Can you also clarify which subset of SQL will be supported here? How complex can the where clause be? --- If your project is set up for it, you
[jira] [Commented] (APEXMALHAR-2066) Add jdbc poller input operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319666#comment-15319666 ] ASF GitHub Bot commented on APEXMALHAR-2066: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/282#discussion_r66169324 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -0,0 +1,652 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import javax.validation.constraints.Min; + +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.commons.lang3.tuple.MutablePair; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.Operator.IdleTimeHandler; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.lib.db.AbstractStoreInputOperator; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Abstract operator for for consuming data using JDBC interface + * User needs User needs to provide + * tableName,dbConnection,setEmitColumnList,look-up key + * Optionally batchSize,pollInterval,Look-up key and a where clause can be given + * + * This operator uses static partitioning to arrive at range queries for exactly + * once reads + * Assumption is that there is an ordered column using which range queries can + * be formed + * If an emitColumnList is provided, please ensure that the keyColumn is the + * first column in the list + * Range queries are formed using the {@link JdbcMetaDataUtility}} Output - + * comma separated list of the emit columns eg columnA,columnB,columnC + * + * @displayName Jdbc Polling Input Operator + * @category Input + * @tags database, sql, jdbc, partitionable,exactlyOnce + */ +public abstract class AbstractJdbcPollInputOperator extends AbstractStoreInputOperator+implements ActivationListener, IdleTimeHandler, Partitioner +{ + /* + * poll interval in milliseconds + */ + private int pollInterval; + + @Min(1) + private int partitionCount = 1; + protected transient int operatorId; + protected transient boolean isReplayed; + protected transient boolean isPollable; + protected int batchSize; + protected int fetchSize; + /** + * Map of windowId to of the range key + */ + protected transient MutablePair currentWindowRecoveryState; + + /** + * size of the emit queue used to hold polled records before emit + */ + private int queueCapacity = 4 * 1024 * 1024; + private transient volatile boolean execute; + private transient AtomicReference cause; + protected transient int spinMillis; + private transient OperatorContext context; + protected String tableName; + protected String key; + protected long currentWindowId; + protected KeyValPair rangeQueryPair; + protected String lower; + protected String upper; + protected boolean recovered;
[GitHub] apex-malhar pull request #314: APEXMALHAR-2113 bug fix, app test case,update...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/314#discussion_r66168863 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java --- @@ -34,25 +33,30 @@ import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator; /** - * This is the base class implementation of a transactionable JDBC output operator. - * Subclasses should implement the method which provides the insertion command. + * This is the base class implementation of a transactionable JDBC output --- End diff -- Please remove the formatting / check-style changes from this PR. Lets have just the relevant changes in this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXCORE-339) Support ability to tag operators as idempotent or non-idempotent
[ https://issues.apache.org/jira/browse/APEXCORE-339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319648#comment-15319648 ] Sandesh commented on APEXCORE-339: -- How does this intersect with -> ProcessingMode - EXACTLY_ONCE ? > Support ability to tag operators as idempotent or non-idempotent > > > Key: APEXCORE-339 > URL: https://issues.apache.org/jira/browse/APEXCORE-339 > Project: Apache Apex Core > Issue Type: Bug >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > Certain application require idempotency and some others don't. Additionally > operators such as input operators need to be specially instrumented to > exhibit idempotent behavior. Certain output operators like database operators > rely on idempotency. For these reasons we need ability to label operators as > idempotent or now, whether output operators require idempotency from upstream > operators or not. In case of a failure the platform should handle > non-idempotent streams by restarting the failed operator and all downsteram > operators at the same checkpoint. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (APEXCORE-339) Support ability to tag operators as idempotent or non-idempotent
[ https://issues.apache.org/jira/browse/APEXCORE-339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15225223#comment-15225223 ] Pramod Immaneni edited comment on APEXCORE-339 at 6/7/16 10:54 PM: --- Follow development here https://github.com/PramodSSImmaneni/incubator-apex-core/tree/APEXCORE-339 was (Author: pramodssimmaneni): Follow development here https://github.com/PramodSSImmaneni/incubator-apex-core/tree/idempotence-recovery > Support ability to tag operators as idempotent or non-idempotent > > > Key: APEXCORE-339 > URL: https://issues.apache.org/jira/browse/APEXCORE-339 > Project: Apache Apex Core > Issue Type: Bug >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > Certain application require idempotency and some others don't. Additionally > operators such as input operators need to be specially instrumented to > exhibit idempotent behavior. Certain output operators like database operators > rely on idempotency. For these reasons we need ability to label operators as > idempotent or now, whether output operators require idempotency from upstream > operators or not. In case of a failure the platform should handle > non-idempotent streams by restarting the failed operator and all downsteram > operators at the same checkpoint. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #295: APEXMALHAR-1966: Update casandra output oprea...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/295#discussion_r66164298 --- Diff: benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java --- @@ -48,4 +48,9 @@ protected Statement setStatementParameters(PreparedStatement updateCommand, return boundStmnt.bind(id++,tuple); } + @Override + public void deactivate() --- End diff -- Can you move this to the Abstract parent class? Otherwise every implementation would have to include this method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-1966) Cassandra output operator improvements
[ https://issues.apache.org/jira/browse/APEXMALHAR-1966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319589#comment-15319589 ] ASF GitHub Bot commented on APEXMALHAR-1966: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/295#discussion_r66164298 --- Diff: benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java --- @@ -48,4 +48,9 @@ protected Statement setStatementParameters(PreparedStatement updateCommand, return boundStmnt.bind(id++,tuple); } + @Override + public void deactivate() --- End diff -- Can you move this to the Abstract parent class? Otherwise every implementation would have to include this method. > Cassandra output operator improvements > -- > > Key: APEXMALHAR-1966 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1966 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Priyanka Gugale >Assignee: Priyanka Gugale > > Update existing Cassandra output operator to: > 1. Accept use defined parameterized queries, the queries could be for update, > insert or delete. > 2. Add error port to emit tuples which couldn't be written to database. > 3. Add metrics > 4. Provide a way to restrict batch size -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2094) Quantiles sketch operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319572#comment-15319572 ] ASF GitHub Bot commented on APEXMALHAR-2094: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/301#discussion_r66163272 --- Diff: sketches/src/main/java/org/apache/apex/malhar/sketches/QuantilesEstimator.java --- @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sketches; + +import com.yahoo.sketches.quantiles.QuantilesSketch; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.common.util.BaseOperator; + +/** + * An implementation of BaseOperator that computes a "sketch" (a representation of the probability distribution using + * a low memory footprint) of the incoming numeric data, and evaluates/outputs the cumulative distribution function and + * quantiles of the probability distribution. Leverages the quantiles sketch implementation from the Yahoo Datasketches + * Library. + * + * Input Port(s) : + * data : Data values input port. + * + * Output Port(s) : + * cdfOutput : cumulative distribution function output port. + * quantilesOutput : quantiles output port. + * + * Partitions : No unifier. Merging these sketches is non-trivial. + * + + */ +@OperatorAnnotation(partitionable = false) +public class QuantilesEstimator extends BaseOperator +{ + /** + * Constructor that allows non-default initialization of the quantile sketch object + * + * @param k:Parameter that determines accuracy and memory usage of quantile sketch. See QuantilesSketch + * documentation for details + * @param seed: The quantile sketch algorithm is inherently random. Set seed to 0 for reproducibility in testing, but + * do not set otherwise. + */ + public QuantilesEstimator(int k, short seed) + { +quantilesSketch = QuantilesSketch.builder().setSeed(seed).build(k); + } + + private transient QuantilesSketch quantilesSketch = QuantilesSketch.builder().build(); + + /** + * This field determines the specific quantiles to be calculated. For a stream of numbers, the quantile at a value + * 0 <= p <= 1 is the number x such that a fraction p of the numbers in the sorted stream are less than x. E.g., the + * quantile at p = 0.5 is the median (half the numbers in the stream are less than the median). + * The default is set to compute the standard quartiles (4-quantiles). + */ + private double[] fractions = {0.0, 0.25, 0.50, 0.75, 1.00}; + /** + * This field determines the intervals on which the probability mass function is computed. + */ + private double[] pmfIntervals = {}; + + /** + * This operator computes three different quantities which are output on separate output ports. If not using any of + * these quantities, these variables can be set to avoid unnecessary computation. + */ + private boolean computeCdf = true; + private boolean computeQuantiles = true; + private boolean computePmf = true; + + public boolean isComputeCdf() + { +return computeCdf; + } + + public void setComputeCdf(boolean computeCdf) + { +this.computeCdf = computeCdf; + } + + public boolean isComputeQuantiles() + { +return computeQuantiles; + } + + public void setComputeQuantiles(boolean computeQuantiles) + { +this.computeQuantiles = computeQuantiles; + } + + public boolean isComputePmf() + { +return computePmf; + } + + public void setComputePmf(boolean computePmf) + { +
[GitHub] apex-malhar pull request #301: [APEXMALHAR-2094] Quantiles operator
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/301#discussion_r66163272 --- Diff: sketches/src/main/java/org/apache/apex/malhar/sketches/QuantilesEstimator.java --- @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sketches; + +import com.yahoo.sketches.quantiles.QuantilesSketch; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.common.util.BaseOperator; + +/** + * An implementation of BaseOperator that computes a "sketch" (a representation of the probability distribution using + * a low memory footprint) of the incoming numeric data, and evaluates/outputs the cumulative distribution function and + * quantiles of the probability distribution. Leverages the quantiles sketch implementation from the Yahoo Datasketches + * Library. + * + * Input Port(s) : + * data : Data values input port. + * + * Output Port(s) : + * cdfOutput : cumulative distribution function output port. + * quantilesOutput : quantiles output port. + * + * Partitions : No unifier. Merging these sketches is non-trivial. + * + + */ +@OperatorAnnotation(partitionable = false) +public class QuantilesEstimator extends BaseOperator +{ + /** + * Constructor that allows non-default initialization of the quantile sketch object + * + * @param k:Parameter that determines accuracy and memory usage of quantile sketch. See QuantilesSketch + * documentation for details + * @param seed: The quantile sketch algorithm is inherently random. Set seed to 0 for reproducibility in testing, but + * do not set otherwise. + */ + public QuantilesEstimator(int k, short seed) + { +quantilesSketch = QuantilesSketch.builder().setSeed(seed).build(k); + } + + private transient QuantilesSketch quantilesSketch = QuantilesSketch.builder().build(); + + /** + * This field determines the specific quantiles to be calculated. For a stream of numbers, the quantile at a value + * 0 <= p <= 1 is the number x such that a fraction p of the numbers in the sorted stream are less than x. E.g., the + * quantile at p = 0.5 is the median (half the numbers in the stream are less than the median). + * The default is set to compute the standard quartiles (4-quantiles). + */ + private double[] fractions = {0.0, 0.25, 0.50, 0.75, 1.00}; + /** + * This field determines the intervals on which the probability mass function is computed. + */ + private double[] pmfIntervals = {}; + + /** + * This operator computes three different quantities which are output on separate output ports. If not using any of + * these quantities, these variables can be set to avoid unnecessary computation. + */ + private boolean computeCdf = true; + private boolean computeQuantiles = true; + private boolean computePmf = true; + + public boolean isComputeCdf() + { +return computeCdf; + } + + public void setComputeCdf(boolean computeCdf) + { +this.computeCdf = computeCdf; + } + + public boolean isComputeQuantiles() + { +return computeQuantiles; + } + + public void setComputeQuantiles(boolean computeQuantiles) + { +this.computeQuantiles = computeQuantiles; + } + + public boolean isComputePmf() + { +return computePmf; + } + + public void setComputePmf(boolean computePmf) + { +this.computePmf = computePmf; + } + + public int getK() + { +return quantilesSketch.getK(); + } + + public double[] getFractions() + { +return fractions; + } + + public void setFractions(double[]
[GitHub] apex-malhar pull request #301: [APEXMALHAR-2094] Quantiles operator
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/301#discussion_r66163056 --- Diff: sketches/pom.xml --- @@ -0,0 +1,75 @@ + +http://www.w3.org/2001/XMLSchema-instance; + xmlns="http://maven.apache.org/POM/4.0.0; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + +malhar +org.apache.apex +3.5.0-SNAPSHOT + + 4.0.0 + + sketches + Apache Apex Malhar Sketch Library + jar + + + + + +org.apache.maven.plugins +maven-jar-plugin +2.4 + + + + test-jar + +package + + + + + + + + + com.yahoo.datasketches + sketches-core + 0.4.1 + + + ${project.groupId} + malhar-library + ${project.version} + + + ${project.groupId} + malhar-library + ${project.version} + test + tests + + + + + --- End diff -- add newline --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2094) Quantiles sketch operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319569#comment-15319569 ] ASF GitHub Bot commented on APEXMALHAR-2094: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/301#discussion_r66163010 --- Diff: sketches/src/main/java/org/apache/apex/malhar/sketches/QuantilesEstimator.java --- @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sketches; + +import com.yahoo.sketches.quantiles.QuantilesSketch; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.common.util.BaseOperator; + +/** + * An implementation of BaseOperator that computes a "sketch" (a representation of the probability distribution using + * a low memory footprint) of the incoming numeric data, and evaluates/outputs the cumulative distribution function and + * quantiles of the probability distribution. Leverages the quantiles sketch implementation from the Yahoo Datasketches + * Library. + * + * Input Port(s) : + * data : Data values input port. + * + * Output Port(s) : + * cdfOutput : cumulative distribution function output port. + * quantilesOutput : quantiles output port. + * + * Partitions : No unifier. Merging these sketches is non-trivial. + * + + */ +@OperatorAnnotation(partitionable = false) +public class QuantilesEstimator extends BaseOperator +{ + /** + * Constructor that allows non-default initialization of the quantile sketch object + * + * @param k:Parameter that determines accuracy and memory usage of quantile sketch. See QuantilesSketch + * documentation for details + * @param seed: The quantile sketch algorithm is inherently random. Set seed to 0 for reproducibility in testing, but + * do not set otherwise. + */ + public QuantilesEstimator(int k, short seed) --- End diff -- You might also need to add a default constructor, for Kryo serializaion to work correctly. > Quantiles sketch operator > - > > Key: APEXMALHAR-2094 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2094 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Sandeep Narayanaswami >Assignee: Sandeep Narayanaswami >Priority: Minor > > An operator that "sketches" in an online fashion the probability distribution > of an input (numeric) data stream, enabling computation of quantiles and > cumulative distribution functions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #301: [APEXMALHAR-2094] Quantiles operator
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/301#discussion_r66163010 --- Diff: sketches/src/main/java/org/apache/apex/malhar/sketches/QuantilesEstimator.java --- @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sketches; + +import com.yahoo.sketches.quantiles.QuantilesSketch; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.common.util.BaseOperator; + +/** + * An implementation of BaseOperator that computes a "sketch" (a representation of the probability distribution using + * a low memory footprint) of the incoming numeric data, and evaluates/outputs the cumulative distribution function and + * quantiles of the probability distribution. Leverages the quantiles sketch implementation from the Yahoo Datasketches + * Library. + * + * Input Port(s) : + * data : Data values input port. + * + * Output Port(s) : + * cdfOutput : cumulative distribution function output port. + * quantilesOutput : quantiles output port. + * + * Partitions : No unifier. Merging these sketches is non-trivial. + * + + */ +@OperatorAnnotation(partitionable = false) +public class QuantilesEstimator extends BaseOperator +{ + /** + * Constructor that allows non-default initialization of the quantile sketch object + * + * @param k:Parameter that determines accuracy and memory usage of quantile sketch. See QuantilesSketch + * documentation for details + * @param seed: The quantile sketch algorithm is inherently random. Set seed to 0 for reproducibility in testing, but + * do not set otherwise. + */ + public QuantilesEstimator(int k, short seed) --- End diff -- You might also need to add a default constructor, for Kryo serializaion to work correctly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2113) JdbcPOJOOutputOperator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319562#comment-15319562 ] ASF GitHub Bot commented on APEXMALHAR-2113: GitHub user devtagare opened a pull request: https://github.com/apache/apex-malhar/pull/314 APEXMALHAR-2113 bug fix, app test case,updated test cases @bhupeshchawda could you please review and merge You can merge this pull request into a Git repository by running: $ git pull https://github.com/devtagare/incubator-apex-malhar APEXMALHAR-2113-JdbcPOJOOutputOperatorFix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/314.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #314 commit d10cfb040a530115bf667333be6c299a9a7baaff Author: devtagareDate: 2016-06-07T18:28:34Z APEXMALHAR-2113 bug fix, app test case,updated test cases > JdbcPOJOOutputOperator > -- > > Key: APEXMALHAR-2113 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2113 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: devendra tagare >Assignee: devendra tagare > Fix For: 3.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > getUpdateCommand(); is marked as @NotNull in the > AbstractJdbcTransactionableOutputOperator which is used by > JdbcPOJOOutputOperator. > This method is referenced during the validation phase of DAG and > updateCommand is initialized only at setup.This is causing the DAG > initialization to fail on constraints violation. > Stack trace below, > An error occurred trying to launch the application. Server message: > javax.validation.ConstraintViolationException: Operator JdbcOutput violates > constraints > [ConstraintViolationImpl{rootBean=JdbcPOJOOutputOperator{name=null}, > propertyPath='updateCommand', message='may not be null', > leafBean=JdbcPOJOOutputOperator{name=null}, value=null}] at > com.datatorrent.stram.plan.logical.LogicalPlan.validate(LogicalPlan.java:1680) > at com.datatorrent.stram.StramClient.(StramClient.java:161) at > com.datatorrent.stram.client.StramAppLauncher.launchApp(StramAppLauncher.java:509) > at com.datatorrent.stram.cli.DTCli$LaunchCommand.execute(DTCli.java:2050) at > com.datatorrent.stram.cli.DTCli.launchAppPackage(DTCli.java:3456) at > com.datatorrent.stram.cli.DTCli.access$7100(DTCli.java:106) at > com.datatorrent.stram.cli.DTCli$LaunchCommand.execute(DTCli.java:1895) at > com.datatorrent.stram.cli.DTCli$3.run(DTCli.java:1449) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2094) Quantiles sketch operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319544#comment-15319544 ] ASF GitHub Bot commented on APEXMALHAR-2094: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/301#discussion_r66161142 --- Diff: sketches/src/main/java/org/apache/apex/malhar/sketches/QuantilesEstimator.java --- @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sketches; + +import com.yahoo.sketches.quantiles.QuantilesSketch; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.common.util.BaseOperator; + +/** + * An implementation of BaseOperator that computes a "sketch" (a representation of the probability distribution using + * a low memory footprint) of the incoming numeric data, and evaluates/outputs the cumulative distribution function and + * quantiles of the probability distribution. Leverages the quantiles sketch implementation from the Yahoo Datasketches + * Library. + * + * Input Port(s) : + * data : Data values input port. + * + * Output Port(s) : + * cdfOutput : cumulative distribution function output port. + * quantilesOutput : quantiles output port. + * + * Partitions : No unifier. Merging these sketches is non-trivial. + * + + */ +@OperatorAnnotation(partitionable = false) +public class QuantilesEstimator extends BaseOperator --- End diff -- Do add the @Evolving annotation as this is a new operator > Quantiles sketch operator > - > > Key: APEXMALHAR-2094 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2094 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Sandeep Narayanaswami >Assignee: Sandeep Narayanaswami >Priority: Minor > > An operator that "sketches" in an online fashion the probability distribution > of an input (numeric) data stream, enabling computation of quantiles and > cumulative distribution functions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2094) Quantiles sketch operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319539#comment-15319539 ] ASF GitHub Bot commented on APEXMALHAR-2094: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/301#discussion_r66160956 --- Diff: sketches/src/main/java/org/apache/apex/malhar/sketches/QuantilesEstimator.java --- @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sketches; + +import com.yahoo.sketches.quantiles.QuantilesSketch; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.common.util.BaseOperator; + +/** + * An implementation of BaseOperator that computes a "sketch" (a representation of the probability distribution using + * a low memory footprint) of the incoming numeric data, and evaluates/outputs the cumulative distribution function and + * quantiles of the probability distribution. Leverages the quantiles sketch implementation from the Yahoo Datasketches + * Library. + * + * Input Port(s) : + * data : Data values input port. + * + * Output Port(s) : + * cdfOutput : cumulative distribution function output port. + * quantilesOutput : quantiles output port. + * + * Partitions : No unifier. Merging these sketches is non-trivial. + * + + */ +@OperatorAnnotation(partitionable = false) +public class QuantilesEstimator extends BaseOperator +{ + /** + * Constructor that allows non-default initialization of the quantile sketch object + * + * @param k:Parameter that determines accuracy and memory usage of quantile sketch. See QuantilesSketch + * documentation for details + * @param seed: The quantile sketch algorithm is inherently random. Set seed to 0 for reproducibility in testing, but + * do not set otherwise. + */ + public QuantilesEstimator(int k, short seed) + { +quantilesSketch = QuantilesSketch.builder().setSeed(seed).build(k); + } + + private transient QuantilesSketch quantilesSketch = QuantilesSketch.builder().build(); + + /** + * This field determines the specific quantiles to be calculated. For a stream of numbers, the quantile at a value + * 0 <= p <= 1 is the number x such that a fraction p of the numbers in the sorted stream are less than x. E.g., the + * quantile at p = 0.5 is the median (half the numbers in the stream are less than the median). + * The default is set to compute the standard quartiles (4-quantiles). + */ + private double[] fractions = {0.0, 0.25, 0.50, 0.75, 1.00}; + /** + * This field determines the intervals on which the probability mass function is computed. + */ + private double[] pmfIntervals = {}; + + /** + * This operator computes three different quantities which are output on separate output ports. If not using any of + * these quantities, these variables can be set to avoid unnecessary computation. + */ + private boolean computeCdf = true; + private boolean computeQuantiles = true; + private boolean computePmf = true; + + public boolean isComputeCdf() + { +return computeCdf; + } + + public void setComputeCdf(boolean computeCdf) --- End diff -- Better to add java docs on getters and setters, the users will access the fields using these. > Quantiles sketch operator > - > > Key: APEXMALHAR-2094 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2094 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Sandeep Narayanaswami >Assignee: Sandeep Narayanaswami >
Re: A proposal for Malhar
I wanted to close the loop on this discussion. In general everyone seemed to be favorable to this idea with no serious objections. Folks had good suggestions like documenting capabilities of operators, come up well defined criteria for graduation of operators and what those criteria may be and what to do with existing operators that may not yet be mature or unused. I am going to summarize the key points that resulted from the discussion and would like to proceed with them. - Operators that do not yet provide the key platform capabilities to make an operator useful across different applications such as reusability, partitioning static or dynamic, idempotency, exactly once will still be accepted as long as they are functionally correct, have unit tests and will go into a separate module. - Contrib module was suggested as a place where new contributions go in that don't yet have all the platform capabilities and are not yet mature. If there are no other suggestions we will go with this one. - It was suggested the operators documentation list those platform capabilities it currently provides from the list above. I will document a structure for this in the contribution guidelines. - Folks wanted to know what would be the criteria to graduate an operator to the big leagues :). I will kick-off a separate thread for it as I think it requires its own discussion and hopefully we can come up with a set of guidelines for it. - David brought up state of some of the existing operators and their retirement and the layout of operators in Malhar in general and how it causes problems with development. I will ask him to lead the discussion on that. Thanks On Fri, May 27, 2016 at 7:47 PM, David Yanwrote: > The two ideas are not conflicting, but rather complementing. > > On the contrary, putting a new process for people trying to contribute > while NOT addressing the old unused subpar operators in the repository is > what is conflicting. > > Keep in mind that when people try to contribute, they always look at the > existing operators already in the repository as examples and likely a model > for their new operators. > > David > > > On Fri, May 27, 2016 at 4:05 PM, Amol Kekre wrote: > > > Yes there are two conflicting threads now. The original thread was to > open > > up a way for contributors to submit code in a dir (contrib?) as long as > > license part of taken care of. > > > > On the thread of removing non-used operators -> How do we know what is > > being used? > > > > Thks, > > Amol > > > > > > On Fri, May 27, 2016 at 3:40 PM, Sandesh Hegde > > wrote: > > > > > +1 for removing the not-used operators. > > > > > > So we are creating a process for operator writers who don't want to > > > understand the platform, yet wants to contribute? How big is that set? > > > If we tell the app-user, here is the code which has not passed all the > > > checklist, will they be ready to use that in production? > > > > > > This thread has 2 conflicting forces, reduce the operators and make it > > easy > > > to add more operators. > > > > > > > > > > > > On Fri, May 27, 2016 at 3:03 PM Pramod Immaneni < > pra...@datatorrent.com> > > > wrote: > > > > > > > On Fri, May 27, 2016 at 2:30 PM, Gaurav Gupta < > > gaurav.gopi...@gmail.com> > > > > wrote: > > > > > > > > > Pramod, > > > > > > > > > > By that logic I would say let's put all partitionable operators > into > > > one > > > > > folder, non-partitionable operators in another and so on... > > > > > > > > > > > > > Remember the original goal of making it easier for new members to > > > > contribute and managing those contributions to maturity. It is not a > > > > functional level separation. > > > > > > > > > > > > > When I look at hadoop code I see these annotations being used at > > class > > > > > level and not at package/folder level. > > > > > > > > > > > > I had a typo in my email, I meant to say "think of this like a > > folder..." > > > > as an analogy and not literally. > > > > > > > > Thanks > > > > > > > > > > > > > Thanks > > > > > > > > > > On Fri, May 27, 2016 at 2:10 PM, Pramod Immaneni < > > > pra...@datatorrent.com > > > > > > > > > > wrote: > > > > > > > > > > > On Fri, May 27, 2016 at 1:05 PM, Gaurav Gupta < > > > > gaurav.gopi...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Can same goal not be achieved by > > > > > > > using > > org.apache.hadoop.classification.InterfaceStability.Evolving > > > / > > > > > > > org.apache.hadoop.classification.InterfaceStability.Unstable > > > > > annotation? > > > > > > > > > > > > > > > > > > > I think it is important to localize the additions in one place so > > > that > > > > it > > > > > > becomes clearer to users about the maturity level of these, > easier > > > for > > > > > > developers to track them towards the path to maturity and also > > > > provides a > > > > > > clearer directive
[GitHub] apex-malhar pull request #301: [APEXMALHAR-2094] Quantiles operator
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/301#discussion_r66152395 --- Diff: sketches/src/main/java/org/apache/apex/malhar/sketches/QuantilesEstimator.java --- @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sketches; + +import com.yahoo.sketches.quantiles.QuantilesSketch; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.common.util.BaseOperator; + +/** + * An implementation of BaseOperator that computes a "sketch" (a representation of the probability distribution using + * a low memory footprint) of the incoming numeric data, and evaluates/outputs the cumulative distribution function and + * quantiles of the probability distribution. Leverages the quantiles sketch implementation from the Yahoo Datasketches + * Library. + * + * Input Port(s) : + * data : Data values input port. + * + * Output Port(s) : + * cdfOutput : cumulative distribution function output port. + * quantilesOutput : quantiles output port. + * + * Partitions : No unifier. Merging these sketches is non-trivial. + * + + */ +@OperatorAnnotation(partitionable = false) +public class QuantilesEstimator extends BaseOperator +{ + /** + * Constructor that allows non-default initialization of the quantile sketch object + * + * @param k:Parameter that determines accuracy and memory usage of quantile sketch. See QuantilesSketch + * documentation for details + * @param seed: The quantile sketch algorithm is inherently random. Set seed to 0 for reproducibility in testing, but + * do not set otherwise. + */ + public QuantilesEstimator(int k, short seed) + { +quantilesSketch = QuantilesSketch.builder().setSeed(seed).build(k); + } + + private transient QuantilesSketch quantilesSketch = QuantilesSketch.builder().build(); + + /** + * This field determines the specific quantiles to be calculated. For a stream of numbers, the quantile at a value + * 0 <= p <= 1 is the number x such that a fraction p of the numbers in the sorted stream are less than x. E.g., the + * quantile at p = 0.5 is the median (half the numbers in the stream are less than the median). + * The default is set to compute the standard quartiles (4-quantiles). + */ + private double[] fractions = {0.0, 0.25, 0.50, 0.75, 1.00}; + /** + * This field determines the intervals on which the probability mass function is computed. + */ + private double[] pmfIntervals = {}; + + /** + * This operator computes three different quantities which are output on separate output ports. If not using any of + * these quantities, these variables can be set to avoid unnecessary computation. + */ + private boolean computeCdf = true; + private boolean computeQuantiles = true; + private boolean computePmf = true; + + public boolean isComputeCdf() + { +return computeCdf; + } + + public void setComputeCdf(boolean computeCdf) + { +this.computeCdf = computeCdf; + } + + public boolean isComputeQuantiles() + { +return computeQuantiles; + } + + public void setComputeQuantiles(boolean computeQuantiles) + { +this.computeQuantiles = computeQuantiles; + } + + public boolean isComputePmf() + { +return computePmf; + } + + public void setComputePmf(boolean computePmf) + { +this.computePmf = computePmf; + } + + public int getK() + { +return quantilesSketch.getK(); + } + + public double[] getFractions() + { +return fractions; + } + + public void setFractions(double[]
[jira] [Commented] (APEXMALHAR-2094) Quantiles sketch operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319394#comment-15319394 ] ASF GitHub Bot commented on APEXMALHAR-2094: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/301#discussion_r66150788 --- Diff: sketches/src/main/java/org/apache/apex/malhar/sketches/QuantilesEstimator.java --- @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sketches; + +import com.yahoo.sketches.quantiles.QuantilesSketch; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.common.util.BaseOperator; + +/** + * An implementation of BaseOperator that computes a "sketch" (a representation of the probability distribution using + * a low memory footprint) of the incoming numeric data, and evaluates/outputs the cumulative distribution function and + * quantiles of the probability distribution. Leverages the quantiles sketch implementation from the Yahoo Datasketches + * Library. + * + * Input Port(s) : + * data : Data values input port. + * + * Output Port(s) : + * cdfOutput : cumulative distribution function output port. + * quantilesOutput : quantiles output port. + * + * Partitions : No unifier. Merging these sketches is non-trivial. + * + + */ +@OperatorAnnotation(partitionable = false) +public class QuantilesEstimator extends BaseOperator +{ + /** + * Constructor that allows non-default initialization of the quantile sketch object + * + * @param k:Parameter that determines accuracy and memory usage of quantile sketch. See QuantilesSketch + * documentation for details + * @param seed: The quantile sketch algorithm is inherently random. Set seed to 0 for reproducibility in testing, but + * do not set otherwise. + */ + public QuantilesEstimator(int k, short seed) + { +quantilesSketch = QuantilesSketch.builder().setSeed(seed).build(k); + } + + private transient QuantilesSketch quantilesSketch = QuantilesSketch.builder().build(); + + /** + * This field determines the specific quantiles to be calculated. For a stream of numbers, the quantile at a value + * 0 <= p <= 1 is the number x such that a fraction p of the numbers in the sorted stream are less than x. E.g., the + * quantile at p = 0.5 is the median (half the numbers in the stream are less than the median). + * The default is set to compute the standard quartiles (4-quantiles). + */ + private double[] fractions = {0.0, 0.25, 0.50, 0.75, 1.00}; + /** + * This field determines the intervals on which the probability mass function is computed. + */ + private double[] pmfIntervals = {}; + + /** + * This operator computes three different quantities which are output on separate output ports. If not using any of + * these quantities, these variables can be set to avoid unnecessary computation. + */ + private boolean computeCdf = true; --- End diff -- You can avoid having these boolean properties by simply checking to see if the corresponding output port is connected and only then do the needed computation. Example: ```if(cdfOutput.isConnected()) { // do processing }``` > Quantiles sketch operator > - > > Key: APEXMALHAR-2094 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2094 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Sandeep Narayanaswami >Assignee: Sandeep Narayanaswami >Priority: Minor > > An operator that "sketches" in an online fashion the probability
[GitHub] apex-malhar pull request #301: [APEXMALHAR-2094] Quantiles operator
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/301#discussion_r66150788 --- Diff: sketches/src/main/java/org/apache/apex/malhar/sketches/QuantilesEstimator.java --- @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sketches; + +import com.yahoo.sketches.quantiles.QuantilesSketch; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.common.util.BaseOperator; + +/** + * An implementation of BaseOperator that computes a "sketch" (a representation of the probability distribution using + * a low memory footprint) of the incoming numeric data, and evaluates/outputs the cumulative distribution function and + * quantiles of the probability distribution. Leverages the quantiles sketch implementation from the Yahoo Datasketches + * Library. + * + * Input Port(s) : + * data : Data values input port. + * + * Output Port(s) : + * cdfOutput : cumulative distribution function output port. + * quantilesOutput : quantiles output port. + * + * Partitions : No unifier. Merging these sketches is non-trivial. + * + + */ +@OperatorAnnotation(partitionable = false) +public class QuantilesEstimator extends BaseOperator +{ + /** + * Constructor that allows non-default initialization of the quantile sketch object + * + * @param k:Parameter that determines accuracy and memory usage of quantile sketch. See QuantilesSketch + * documentation for details + * @param seed: The quantile sketch algorithm is inherently random. Set seed to 0 for reproducibility in testing, but + * do not set otherwise. + */ + public QuantilesEstimator(int k, short seed) + { +quantilesSketch = QuantilesSketch.builder().setSeed(seed).build(k); + } + + private transient QuantilesSketch quantilesSketch = QuantilesSketch.builder().build(); + + /** + * This field determines the specific quantiles to be calculated. For a stream of numbers, the quantile at a value + * 0 <= p <= 1 is the number x such that a fraction p of the numbers in the sorted stream are less than x. E.g., the + * quantile at p = 0.5 is the median (half the numbers in the stream are less than the median). + * The default is set to compute the standard quartiles (4-quantiles). + */ + private double[] fractions = {0.0, 0.25, 0.50, 0.75, 1.00}; + /** + * This field determines the intervals on which the probability mass function is computed. + */ + private double[] pmfIntervals = {}; + + /** + * This operator computes three different quantities which are output on separate output ports. If not using any of + * these quantities, these variables can be set to avoid unnecessary computation. + */ + private boolean computeCdf = true; --- End diff -- You can avoid having these boolean properties by simply checking to see if the corresponding output port is connected and only then do the needed computation. Example: ```if(cdfOutput.isConnected()) { // do processing }``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2094) Quantiles sketch operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319369#comment-15319369 ] ASF GitHub Bot commented on APEXMALHAR-2094: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/301#discussion_r66149667 --- Diff: sketches/src/main/java/org/apache/apex/malhar/sketches/QuantilesEstimator.java --- @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sketches; + +import com.yahoo.sketches.quantiles.QuantilesSketch; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.common.util.BaseOperator; + +/** + * An implementation of BaseOperator that computes a "sketch" (a representation of the probability distribution using + * a low memory footprint) of the incoming numeric data, and evaluates/outputs the cumulative distribution function and + * quantiles of the probability distribution. Leverages the quantiles sketch implementation from the Yahoo Datasketches + * Library. + * + * Input Port(s) : + * data : Data values input port. + * + * Output Port(s) : + * cdfOutput : cumulative distribution function output port. + * quantilesOutput : quantiles output port. + * + * Partitions : No unifier. Merging these sketches is non-trivial. + * + + */ +@OperatorAnnotation(partitionable = false) +public class QuantilesEstimator extends BaseOperator +{ + /** + * Constructor that allows non-default initialization of the quantile sketch object + * + * @param k:Parameter that determines accuracy and memory usage of quantile sketch. See QuantilesSketch --- End diff -- A link to the documentation would be helpful > Quantiles sketch operator > - > > Key: APEXMALHAR-2094 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2094 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Sandeep Narayanaswami >Assignee: Sandeep Narayanaswami >Priority: Minor > > An operator that "sketches" in an online fashion the probability distribution > of an input (numeric) data stream, enabling computation of quantiles and > cumulative distribution functions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: APC
Yeah I tried running 3.4.0 on it and was greeted with a nice error message, had to roll back to 3.3 Thanks, Tim On Tue, Jun 7, 2016 at 1:38 PM, Munagala Ramanathwrote: > Also, last I looked, the sandbox was still 3.3.1 > > Ram > > On Tue, Jun 7, 2016 at 1:29 PM, Timothy Farkas < > timothytiborfar...@gmail.com > > wrote: > > > Thanks Ram, will take a note. > > > > Tim > > > > On Tue, Jun 7, 2016 at 1:27 PM, Munagala Ramanath > > wrote: > > > > > The archetype command for each version is in the file > > > *apex-app-archetype/README.md* > > > file of the appropriate branch. Here is the script I currently use: > > > > > > *#!/bin/bash* > > > > > > *#v='3.4.0'* > > > *v='3.5.0-SNAPSHOT'* > > > *mvn archetype:generate -B \* > > > * -DarchetypeGroupId=org.apache.apex \* > > > * -DarchetypeArtifactId=apex-app-archetype \* > > > * -DarchetypeVersion="$v" \* > > > * -DgroupId=com.example \* > > > * -Dpackage=com.example.myapexapp \* > > > * -DartifactId=myapexapp \* > > > * -Dversion=1.0-SNAPSHOT* > > > > > > Ram > > > > > > On Tue, Jun 7, 2016 at 1:20 PM, Timothy Farkas < > > > timothytiborfar...@gmail.com > > > > wrote: > > > > > > > Ah Thanks Ram. I did a quick google for the archetype, and this came > up > > > > > > > > https://github.com/apache/apex-core/tree/master/apex-conf-archetype > > > > > > > > I then blindly copied the command, which is for config packages. > Silly > > me > > > > :). Do you have a link to the correct apa archetype command for > future > > > > reference? > > > > > > > > Thanks, > > > > Tim > > > > > > > > On Tue, Jun 7, 2016 at 1:15 PM, Munagala Ramanath < > r...@datatorrent.com > > > > > > > wrote: > > > > > > > > > Tim, > > > > > > > > > > Are you building a config package by any chance, since that builds > an > > > > *apc* > > > > > file. > > > > > Can you post the exact archetype command you ran ? > > > > > > > > > > I just ran the archetype command for 3.4.0 and 3.5.0-SNAPSHOT and > it > > > > built > > > > > an *apa* just as it always does. > > > > > > > > > > Ram > > > > > > > > > > On Tue, Jun 7, 2016 at 12:44 PM, Timothy Farkas < > > > > > timothytiborfar...@gmail.com> wrote: > > > > > > > > > > > Hi All, > > > > > > > > > > > > I noticed that the new project template generates a apc, doesn't > > seem > > > > to > > > > > > launch in the sandbox I just downloaded. Can I get some hints > about > > > > what > > > > > > version works with what, and what the differences between an apc > > and > > > > the > > > > > > old apa are? > > > > > > > > > > > > Thanks! > > > > > > Tim > > > > > > > > > > > > > > > > > > > > >
[jira] [Commented] (APEXMALHAR-2106) Support merging multiple streams with StreamMerger
[ https://issues.apache.org/jira/browse/APEXMALHAR-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319332#comment-15319332 ] ASF GitHub Bot commented on APEXMALHAR-2106: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/309#discussion_r66146875 --- Diff: library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java --- @@ -59,6 +61,7 @@ public void process(K tuple) /** * Data input port 2. */ + @InputPortFieldAnnotation(optional = true) --- End diff -- Same here > Support merging multiple streams with StreamMerger > --- > > Key: APEXMALHAR-2106 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2106 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Ilya Ganelin >Assignee: Ilya Ganelin > > To properly implement the Flatten transformation (and other Stream > combination operations), Apex must support merging data from multiple > sources. The StreamMerger operator can be improved to merge multiple streams, > rather than just the two streams it can handle in the present implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2106) Support merging multiple streams with StreamMerger
[ https://issues.apache.org/jira/browse/APEXMALHAR-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319330#comment-15319330 ] ASF GitHub Bot commented on APEXMALHAR-2106: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/309#discussion_r66146828 --- Diff: library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java --- @@ -44,6 +45,7 @@ /** * Data input port 1. */ + @InputPortFieldAnnotation(optional = true) --- End diff -- This won't be needed if we don't use mergers with just one input. See comment above on number of mergers. > Support merging multiple streams with StreamMerger > --- > > Key: APEXMALHAR-2106 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2106 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Ilya Ganelin >Assignee: Ilya Ganelin > > To properly implement the Flatten transformation (and other Stream > combination operations), Apex must support merging data from multiple > sources. The StreamMerger operator can be improved to merge multiple streams, > rather than just the two streams it can handle in the present implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (APEXMALHAR-2085) Implement Windowed Operators
[ https://issues.apache.org/jira/browse/APEXMALHAR-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Yan reassigned APEXMALHAR-2085: - Assignee: David Yan (was: Siyuan Hua) > Implement Windowed Operators > > > Key: APEXMALHAR-2085 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2085 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Siyuan Hua >Assignee: David Yan > > As per our recent several discussions in the community. A group of Windowed > Operators that delivers the window semantic follows the google Data Flow > model(https://cloud.google.com/dataflow/) is very important. > The operators should be designed and implemented in a way for > High-level API > Beam translation > Easy to use with other popular operator > {panel:title=Operator Hierarchy} > Hierarchy of the operators, > The windowed operators should cover all possible transformations that require > window, and batch processing is also considered as special window called > global window > {code} >+---+ >+-> | WindowedOperator | <+ >| ++--+ | >|^ ^+ >|| | | >|| | | > +--+++--+--+ +---+-++--+-+ > |CombineOperator||GroupOperator| |KeyedOperator||JoinOperator| > +---++-+ +--+--++-+--+ >+-^ ^ ^ >| | | > ++---+ +-++ +++ > |KeyedCombine| |KeyedGroup| | CoGroup | > ++ +--+ +-+ > {code} > Combine operation includes all operations that combine all tuples in one > window into one or small number of tuples, Group operation group all tuples > in one window, Join and CoGroup are used to join and group tuples from > different inputs. > {panel} > {panel:title=Components} > * Window Component > It includes configuration, window state that should be checkpointed, etc. It > should support NonMergibleWindow(fixed or slide) MergibleWindow(Session) > * Trigger > It should support early trigger, late trigger with customizable trigger > behaviour > * Other related components: > ** Watermark generator, can be plugged into input source to generate watermark > ** Tuple schema support: > It should handle either predefined tuple type or give a declarative API to > describe the user defined tuple class > {panel} > Most component API should be reused in High-Level API > This is the umbrella ticket, separate tickets would be created for different > components and operators respectively -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2106) Support merging multiple streams with StreamMerger
[ https://issues.apache.org/jira/browse/APEXMALHAR-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319288#comment-15319288 ] ASF GitHub Bot commented on APEXMALHAR-2106: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/309#discussion_r66144530 --- Diff: library/src/main/java/com/datatorrent/lib/stream/MultipleStreamMerger.java --- @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.stream; + +import java.util.ArrayList; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Module; + +/** + * Module that adds functionality to bypass the platform limitations of combining more than two streams at a time with + * Stream Merger. + * + * Usage: + * + * dag.addOperator("Stream_1", op1); + * dag.addOperator("Stream_2", op2); + * dag.addOperator("Stream_3", op3); + * + * MultipleStreamMerger merger = new MultipleStreamMerger(); + * merger.merge(op1.out) + * .merge(op2.out) + * .merge(op3.out) + * .insertInto(dag, conf); + * + * dag.addModule("Merger", merger); + * + * @param + */ +public class MultipleStreamMerger implements Module +{ + public class Stream + { +DefaultInputPort destPort; +DefaultOutputPort sourcePort; +String name; + +public Stream(String name, DefaultOutputPort sourcePort, DefaultInputPort destPort) +{ + this.destPort = destPort; + this.sourcePort = sourcePort; + this.name = name; +} + } + + public class NamedMerger + { +StreamMerger merger; +String name; + +public NamedMerger(String name, StreamMerger merger) +{ + this.merger = merger; + this.name = name; +} + } + + private int streamCount = 0; + + ArrayListstreamsToMerge = new ArrayList<>(); + + public transient ProxyOutputPort streamOutput = new ProxyOutputPort<>(); + + /** + * Used to define all the sources to be merged into a single stream. + * + * @param sourcePort - The output port from the upstream operator that provides data + * @return The updated MultipleStreamMerger object that tracks which streams should be unified. + */ + public MultipleStreamMerger merge(DefaultOutputPort sourcePort) + { +streamsToMerge.add(sourcePort); +return this; + } + + /** + * To merge more than two streams at a time, we construct a binary tree of thread-local StreamMerger operators + * E.g. + * + * Tier 0 Tier 1 Tier 2 + * + * Stream 1 -> + * StreamMerger_1 -> + * Stream 2 -> + * StreamMerger_Final -> Out + * Stream 3 -> + * StreamMerger_2 -> + * Stream 4 -> + * + * This function updates the provided DAG with the relevant streams. + */ + public void mergeStreams(DAG dag, Configuration conf) + { +if (streamsToMerge.size() < 2) { + throw new IllegalArgumentException("Not enough streams to merge, at least two streams must be selected for " + + "merging with `.merge()`."); +} + +ArrayList streamsToAddToDag = new ArrayList<>(); +ArrayList operatorsToAdd = new ArrayList<>(); + +// Determine operators and streams to add to the DAG +constructMergeTree(streamsToAddToDag, operatorsToAdd); + +for (NamedMerger m : operatorsToAdd) { + dag.addOperator(m.name, m.merger); +} + +for (Stream s : streamsToAddToDag) { + dag.addStream(s.name, s.sourcePort,
[GitHub] apex-malhar pull request #309: [APEXMALHAR-2106][WIP] Support multiple strea...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/309#discussion_r66144530 --- Diff: library/src/main/java/com/datatorrent/lib/stream/MultipleStreamMerger.java --- @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.stream; + +import java.util.ArrayList; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Module; + +/** + * Module that adds functionality to bypass the platform limitations of combining more than two streams at a time with + * Stream Merger. + * + * Usage: + * + * dag.addOperator("Stream_1", op1); + * dag.addOperator("Stream_2", op2); + * dag.addOperator("Stream_3", op3); + * + * MultipleStreamMerger merger = new MultipleStreamMerger(); + * merger.merge(op1.out) + * .merge(op2.out) + * .merge(op3.out) + * .insertInto(dag, conf); + * + * dag.addModule("Merger", merger); + * + * @param + */ +public class MultipleStreamMerger implements Module +{ + public class Stream + { +DefaultInputPort destPort; +DefaultOutputPort sourcePort; +String name; + +public Stream(String name, DefaultOutputPort sourcePort, DefaultInputPort destPort) +{ + this.destPort = destPort; + this.sourcePort = sourcePort; + this.name = name; +} + } + + public class NamedMerger + { +StreamMerger merger; +String name; + +public NamedMerger(String name, StreamMerger merger) +{ + this.merger = merger; + this.name = name; +} + } + + private int streamCount = 0; + + ArrayListstreamsToMerge = new ArrayList<>(); + + public transient ProxyOutputPort streamOutput = new ProxyOutputPort<>(); + + /** + * Used to define all the sources to be merged into a single stream. + * + * @param sourcePort - The output port from the upstream operator that provides data + * @return The updated MultipleStreamMerger object that tracks which streams should be unified. + */ + public MultipleStreamMerger merge(DefaultOutputPort sourcePort) + { +streamsToMerge.add(sourcePort); +return this; + } + + /** + * To merge more than two streams at a time, we construct a binary tree of thread-local StreamMerger operators + * E.g. + * + * Tier 0 Tier 1 Tier 2 + * + * Stream 1 -> + * StreamMerger_1 -> + * Stream 2 -> + * StreamMerger_Final -> Out + * Stream 3 -> + * StreamMerger_2 -> + * Stream 4 -> + * + * This function updates the provided DAG with the relevant streams. + */ + public void mergeStreams(DAG dag, Configuration conf) + { +if (streamsToMerge.size() < 2) { + throw new IllegalArgumentException("Not enough streams to merge, at least two streams must be selected for " + + "merging with `.merge()`."); +} + +ArrayList streamsToAddToDag = new ArrayList<>(); +ArrayList operatorsToAdd = new ArrayList<>(); + +// Determine operators and streams to add to the DAG +constructMergeTree(streamsToAddToDag, operatorsToAdd); + +for (NamedMerger m : operatorsToAdd) { + dag.addOperator(m.name, m.merger); +} + +for (Stream s : streamsToAddToDag) { + dag.addStream(s.name, s.sourcePort, s.destPort).setLocality(DAG.Locality.CONTAINER_LOCAL); +} + } + + /** + * Given a set of streams to be merged (defined via {@link #merge(DefaultOutputPort)}), compute the optimal + * structure of cascading mergers that need to be instantiated, added
Re: APC
Tim, Are you building a config package by any chance, since that builds an *apc* file. Can you post the exact archetype command you ran ? I just ran the archetype command for 3.4.0 and 3.5.0-SNAPSHOT and it built an *apa* just as it always does. Ram On Tue, Jun 7, 2016 at 12:44 PM, Timothy Farkas < timothytiborfar...@gmail.com> wrote: > Hi All, > > I noticed that the new project template generates a apc, doesn't seem to > launch in the sandbox I just downloaded. Can I get some hints about what > version works with what, and what the differences between an apc and the > old apa are? > > Thanks! > Tim >
Re: Proposal : DAG - SetOperatorAttribute
I have created a Jira, will start working on this. https://issues.apache.org/jira/browse/APEXCORE-470 On Tue, Jun 7, 2016 at 12:21 PM Munagala Ramanathwrote: > +1 > > Since we have *setInputPortAttribute* and *setOutputPortAttribute*, it > seems reasonable > to add *setOperatorAttribute*. > > Ram > > On Mon, Jun 6, 2016 at 1:39 PM, Sandesh Hegde > wrote: > > > Currently, *setAttribute* is used to set the operator attributes. Other 2 > > Attribute setting APIs are specific to input ports > > (*setInputPortAttributes*) and output ports (*setOutputPortsAttributes*). > > > > Proposal is to have *SetOperatorAttribute* > > api, which will clearly indicate that user wants set attributes on the > > operator. > > ( setOperatorAttribute(Operator operator, Attribute key, T value) ) > > > > Following will be the roles for the APIs > > *setAttributes* --> for setting Attributes for the whole DAG ( > > setAttribute(Operator operator, Attribute key, T value) - can be > > deprecated ) > > *setOperatorAttributes* --> for setting Attributes for the operator > > > > Let me know your thoughts. > > > > Thanks > > >
[jira] [Commented] (APEXMALHAR-2106) Support merging multiple streams with StreamMerger
[ https://issues.apache.org/jira/browse/APEXMALHAR-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319260#comment-15319260 ] ASF GitHub Bot commented on APEXMALHAR-2106: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/309#discussion_r66142732 --- Diff: library/src/main/java/com/datatorrent/lib/stream/MultipleStreamMerger.java --- @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.stream; + +import java.util.ArrayList; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Module; + +/** + * Module that adds functionality to bypass the platform limitations of combining more than two streams at a time with + * Stream Merger. + * + * Usage: + * + * dag.addOperator("Stream_1", op1); + * dag.addOperator("Stream_2", op2); + * dag.addOperator("Stream_3", op3); + * + * MultipleStreamMerger merger = new MultipleStreamMerger(); + * merger.merge(op1.out) + * .merge(op2.out) + * .merge(op3.out) + * .insertInto(dag, conf); + * + * dag.addModule("Merger", merger); + * + * @param + */ +public class MultipleStreamMerger implements Module +{ + public class Stream + { +DefaultInputPort destPort; +DefaultOutputPort sourcePort; +String name; + +public Stream(String name, DefaultOutputPort sourcePort, DefaultInputPort destPort) +{ + this.destPort = destPort; + this.sourcePort = sourcePort; + this.name = name; +} + } + + public class NamedMerger + { +StreamMerger merger; +String name; + +public NamedMerger(String name, StreamMerger merger) +{ + this.merger = merger; + this.name = name; +} + } + + private int streamCount = 0; + + ArrayListstreamsToMerge = new ArrayList<>(); + + public transient ProxyOutputPort streamOutput = new ProxyOutputPort<>(); + + /** + * Used to define all the sources to be merged into a single stream. + * + * @param sourcePort - The output port from the upstream operator that provides data + * @return The updated MultipleStreamMerger object that tracks which streams should be unified. + */ + public MultipleStreamMerger merge(DefaultOutputPort sourcePort) + { +streamsToMerge.add(sourcePort); +return this; + } + + /** + * To merge more than two streams at a time, we construct a binary tree of thread-local StreamMerger operators + * E.g. + * + * Tier 0 Tier 1 Tier 2 + * + * Stream 1 -> + * StreamMerger_1 -> + * Stream 2 -> + * StreamMerger_Final -> Out + * Stream 3 -> + * StreamMerger_2 -> + * Stream 4 -> + * + * This function updates the provided DAG with the relevant streams. + */ + public void mergeStreams(DAG dag, Configuration conf) + { +if (streamsToMerge.size() < 2) { + throw new IllegalArgumentException("Not enough streams to merge, at least two streams must be selected for " + + "merging with `.merge()`."); +} + +ArrayList streamsToAddToDag = new ArrayList<>(); +ArrayList operatorsToAdd = new ArrayList<>(); + +// Determine operators and streams to add to the DAG +constructMergeTree(streamsToAddToDag, operatorsToAdd); + +for (NamedMerger m : operatorsToAdd) { + dag.addOperator(m.name, m.merger); +} + +for (Stream s : streamsToAddToDag) { + dag.addStream(s.name, s.sourcePort,
[GitHub] apex-malhar pull request #309: [APEXMALHAR-2106][WIP] Support multiple strea...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/309#discussion_r66142732 --- Diff: library/src/main/java/com/datatorrent/lib/stream/MultipleStreamMerger.java --- @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.stream; + +import java.util.ArrayList; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Module; + +/** + * Module that adds functionality to bypass the platform limitations of combining more than two streams at a time with + * Stream Merger. + * + * Usage: + * + * dag.addOperator("Stream_1", op1); + * dag.addOperator("Stream_2", op2); + * dag.addOperator("Stream_3", op3); + * + * MultipleStreamMerger merger = new MultipleStreamMerger(); + * merger.merge(op1.out) + * .merge(op2.out) + * .merge(op3.out) + * .insertInto(dag, conf); + * + * dag.addModule("Merger", merger); + * + * @param + */ +public class MultipleStreamMerger implements Module +{ + public class Stream + { +DefaultInputPort destPort; +DefaultOutputPort sourcePort; +String name; + +public Stream(String name, DefaultOutputPort sourcePort, DefaultInputPort destPort) +{ + this.destPort = destPort; + this.sourcePort = sourcePort; + this.name = name; +} + } + + public class NamedMerger + { +StreamMerger merger; +String name; + +public NamedMerger(String name, StreamMerger merger) +{ + this.merger = merger; + this.name = name; +} + } + + private int streamCount = 0; + + ArrayListstreamsToMerge = new ArrayList<>(); + + public transient ProxyOutputPort streamOutput = new ProxyOutputPort<>(); + + /** + * Used to define all the sources to be merged into a single stream. + * + * @param sourcePort - The output port from the upstream operator that provides data + * @return The updated MultipleStreamMerger object that tracks which streams should be unified. + */ + public MultipleStreamMerger merge(DefaultOutputPort sourcePort) + { +streamsToMerge.add(sourcePort); +return this; + } + + /** + * To merge more than two streams at a time, we construct a binary tree of thread-local StreamMerger operators + * E.g. + * + * Tier 0 Tier 1 Tier 2 + * + * Stream 1 -> + * StreamMerger_1 -> + * Stream 2 -> + * StreamMerger_Final -> Out + * Stream 3 -> + * StreamMerger_2 -> + * Stream 4 -> + * + * This function updates the provided DAG with the relevant streams. + */ + public void mergeStreams(DAG dag, Configuration conf) + { +if (streamsToMerge.size() < 2) { + throw new IllegalArgumentException("Not enough streams to merge, at least two streams must be selected for " + + "merging with `.merge()`."); +} + +ArrayList streamsToAddToDag = new ArrayList<>(); +ArrayList operatorsToAdd = new ArrayList<>(); + +// Determine operators and streams to add to the DAG +constructMergeTree(streamsToAddToDag, operatorsToAdd); + +for (NamedMerger m : operatorsToAdd) { + dag.addOperator(m.name, m.merger); +} + +for (Stream s : streamsToAddToDag) { + dag.addStream(s.name, s.sourcePort, s.destPort).setLocality(DAG.Locality.CONTAINER_LOCAL); +} + } + + /** + * Given a set of streams to be merged (defined via {@link #merge(DefaultOutputPort)}), compute the optimal + * structure of cascading mergers that need to be instantiated, added
[jira] [Created] (APEXCORE-470) New Api for setting the attribute on the operator ( setOperatorAttribute )
Sandesh created APEXCORE-470: Summary: New Api for setting the attribute on the operator ( setOperatorAttribute ) Key: APEXCORE-470 URL: https://issues.apache.org/jira/browse/APEXCORE-470 Project: Apache Apex Core Issue Type: Improvement Reporter: Sandesh Assignee: Sandesh Currently, *setAttribute* is used to set the operator attributes. Other 2 Attribute setting APIs are specific to input ports (*setInputPortAttributes*) and output ports (*setOutputPortsAttributes*). Proposal is to have *SetOperatorAttribute* api, which will clearly indicate that user wants set attributes on the operator. ( setOperatorAttribute(Operator operator, Attribute key, T value) ) Following will be the roles for the APIs *setAttributes* --> for setting Attributes for the whole DAG ( setAttribute(Operator operator, Attribute key, T value) - can be deprecated ) *setOperatorAttributes* --> for setting Attributes for the operator All the unit test cases using the previous API will be renamed as a part of this change. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
APC
Hi All, I noticed that the new project template generates a apc, doesn't seem to launch in the sandbox I just downloaded. Can I get some hints about what version works with what, and what the differences between an apc and the old apa are? Thanks! Tim
Re: Proposal : DAG - SetOperatorAttribute
+1 Since we have *setInputPortAttribute* and *setOutputPortAttribute*, it seems reasonable to add *setOperatorAttribute*. Ram On Mon, Jun 6, 2016 at 1:39 PM, Sandesh Hegdewrote: > Currently, *setAttribute* is used to set the operator attributes. Other 2 > Attribute setting APIs are specific to input ports > (*setInputPortAttributes*) and output ports (*setOutputPortsAttributes*). > > Proposal is to have *SetOperatorAttribute* > api, which will clearly indicate that user wants set attributes on the > operator. > ( setOperatorAttribute(Operator operator, Attribute key, T value) ) > > Following will be the roles for the APIs > *setAttributes* --> for setting Attributes for the whole DAG ( > setAttribute(Operator operator, Attribute key, T value) - can be > deprecated ) > *setOperatorAttributes* --> for setting Attributes for the operator > > Let me know your thoughts. > > Thanks >
[GitHub] apex-malhar pull request #313: Apexmalhar 2113 jdbc pojo output operator
Github user devtagare closed the pull request at: https://github.com/apache/apex-malhar/pull/313 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2086) Kafka Output Operator with Kafka 0.9 API
[ https://issues.apache.org/jira/browse/APEXMALHAR-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319133#comment-15319133 ] ASF GitHub Bot commented on APEXMALHAR-2086: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/298#discussion_r66132271 --- Diff: kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java --- @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.kafka; + +import java.util.Properties; +import javax.validation.constraints.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import com.datatorrent.api.Context; +import com.datatorrent.api.Operator; + +public abstract class AbstractKafkaOutputOperatorimplements Operator +{ + @SuppressWarnings("unused") + private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaOutputOperator.class); --- End diff -- Can you remove the logger is unused? > Kafka Output Operator with Kafka 0.9 API > > > Key: APEXMALHAR-2086 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2086 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Sandesh >Assignee: Sandesh > > Goal : 2 Operartors for Kafka Output > 1. Simple Kafka Output Operator > - Supports Atleast Once > - Expose most used producer properties as class properties > 2. Exactly Once Kafka Output ( Not possible in all the cases, will be > documented later ) > > Design for Exactly Once > Window Data Manager - Stores the Kafka partitions offsets. > Kafka Key - Used by the operator = AppID#OperatorId > During recovery. Partially written window is re-created using the following > approach: > Tuples between the largest recovery offsets and the current offset are > checked. Based on the key, tuples written by the other entities are > discarded. > Only tuples which are not in the recovered set are emitted. > Tuples needs to be unique within the window. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #298: *For review only* [APEXMALHAR-2086] Kafka out...
Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/298#discussion_r66131514 --- Diff: kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java --- @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.kafka; + +import java.util.Properties; +import javax.validation.constraints.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import com.datatorrent.api.Context; +import com.datatorrent.api.Operator; + +public abstract class AbstractKafkaOutputOperatorimplements Operator +{ + @SuppressWarnings("unused") + private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaOutputOperator.class); + private transient Producer producer; // K is key partitioner, V is value type + @NotNull + private String topic = "topic1"; + private String producerProperties = ""; + private Properties configProperties = new Properties(); + public Properties getConfigProperties() + { +return configProperties; + } + + public void setConfigProperties(Properties configProperties) + { +this.configProperties.putAll(configProperties); + } + + /** + * setup producer configuration. + * @return ProducerConfig + */ + protected Properties createKafkaProducerConfig() + { +Properties prop = new Properties(); +for (String propString : producerProperties.split(",")) { + if (!propString.contains("=")) { +continue; + } + String[] keyVal = StringUtils.trim(propString).split("="); + prop.put(StringUtils.trim(keyVal[0]), StringUtils.trim(keyVal[1])); +} + +configProperties.putAll(prop); + +return configProperties; + } + + public Producer getProducer() --- End diff -- Any reason why this is made public? Its being accessed from subclass.. So can be made protected. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2086) Kafka Output Operator with Kafka 0.9 API
[ https://issues.apache.org/jira/browse/APEXMALHAR-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319122#comment-15319122 ] ASF GitHub Bot commented on APEXMALHAR-2086: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/298#discussion_r66131514 --- Diff: kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java --- @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.kafka; + +import java.util.Properties; +import javax.validation.constraints.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import com.datatorrent.api.Context; +import com.datatorrent.api.Operator; + +public abstract class AbstractKafkaOutputOperatorimplements Operator +{ + @SuppressWarnings("unused") + private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaOutputOperator.class); + private transient Producer producer; // K is key partitioner, V is value type + @NotNull + private String topic = "topic1"; + private String producerProperties = ""; + private Properties configProperties = new Properties(); + public Properties getConfigProperties() + { +return configProperties; + } + + public void setConfigProperties(Properties configProperties) + { +this.configProperties.putAll(configProperties); + } + + /** + * setup producer configuration. + * @return ProducerConfig + */ + protected Properties createKafkaProducerConfig() + { +Properties prop = new Properties(); +for (String propString : producerProperties.split(",")) { + if (!propString.contains("=")) { +continue; + } + String[] keyVal = StringUtils.trim(propString).split("="); + prop.put(StringUtils.trim(keyVal[0]), StringUtils.trim(keyVal[1])); +} + +configProperties.putAll(prop); + +return configProperties; + } + + public Producer getProducer() --- End diff -- Any reason why this is made public? Its being accessed from subclass.. So can be made protected. > Kafka Output Operator with Kafka 0.9 API > > > Key: APEXMALHAR-2086 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2086 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Sandesh >Assignee: Sandesh > > Goal : 2 Operartors for Kafka Output > 1. Simple Kafka Output Operator > - Supports Atleast Once > - Expose most used producer properties as class properties > 2. Exactly Once Kafka Output ( Not possible in all the cases, will be > documented later ) > > Design for Exactly Once > Window Data Manager - Stores the Kafka partitions offsets. > Kafka Key - Used by the operator = AppID#OperatorId > During recovery. Partially written window is re-created using the following > approach: > Tuples between the largest recovery offsets and the current offset are > checked. Based on the key, tuples written by the other entities are > discarded. > Only tuples which are not in the recovered set are emitted. > Tuples needs to be unique within the window. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2086) Kafka Output Operator with Kafka 0.9 API
[ https://issues.apache.org/jira/browse/APEXMALHAR-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319118#comment-15319118 ] ASF GitHub Bot commented on APEXMALHAR-2086: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/298#discussion_r66131334 --- Diff: kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java --- @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.kafka; + +import java.util.Properties; +import javax.validation.constraints.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import com.datatorrent.api.Context; +import com.datatorrent.api.Operator; + +public abstract class AbstractKafkaOutputOperatorimplements Operator +{ + @SuppressWarnings("unused") + private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaOutputOperator.class); + private transient Producer producer; // K is key partitioner, V is value type + @NotNull + private String topic = "topic1"; + private String producerProperties = ""; + private Properties configProperties = new Properties(); + public Properties getConfigProperties() --- End diff -- And also add javadoc for each of those. > Kafka Output Operator with Kafka 0.9 API > > > Key: APEXMALHAR-2086 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2086 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Sandesh >Assignee: Sandesh > > Goal : 2 Operartors for Kafka Output > 1. Simple Kafka Output Operator > - Supports Atleast Once > - Expose most used producer properties as class properties > 2. Exactly Once Kafka Output ( Not possible in all the cases, will be > documented later ) > > Design for Exactly Once > Window Data Manager - Stores the Kafka partitions offsets. > Kafka Key - Used by the operator = AppID#OperatorId > During recovery. Partially written window is re-created using the following > approach: > Tuples between the largest recovery offsets and the current offset are > checked. Based on the key, tuples written by the other entities are > discarded. > Only tuples which are not in the recovered set are emitted. > Tuples needs to be unique within the window. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2086) Kafka Output Operator with Kafka 0.9 API
[ https://issues.apache.org/jira/browse/APEXMALHAR-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319117#comment-15319117 ] ASF GitHub Bot commented on APEXMALHAR-2086: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/298#discussion_r66131266 --- Diff: kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java --- @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.kafka; + +import java.util.Properties; +import javax.validation.constraints.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import com.datatorrent.api.Context; +import com.datatorrent.api.Operator; + +public abstract class AbstractKafkaOutputOperatorimplements Operator +{ + @SuppressWarnings("unused") + private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaOutputOperator.class); + private transient Producer producer; // K is key partitioner, V is value type + @NotNull + private String topic = "topic1"; + private String producerProperties = ""; + private Properties configProperties = new Properties(); + public Properties getConfigProperties() --- End diff -- We don't have a constraint like this, but would be good if you could organize the property access method at one place. > Kafka Output Operator with Kafka 0.9 API > > > Key: APEXMALHAR-2086 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2086 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Sandesh >Assignee: Sandesh > > Goal : 2 Operartors for Kafka Output > 1. Simple Kafka Output Operator > - Supports Atleast Once > - Expose most used producer properties as class properties > 2. Exactly Once Kafka Output ( Not possible in all the cases, will be > documented later ) > > Design for Exactly Once > Window Data Manager - Stores the Kafka partitions offsets. > Kafka Key - Used by the operator = AppID#OperatorId > During recovery. Partially written window is re-created using the following > approach: > Tuples between the largest recovery offsets and the current offset are > checked. Based on the key, tuples written by the other entities are > discarded. > Only tuples which are not in the recovered set are emitted. > Tuples needs to be unique within the window. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #298: *For review only* [APEXMALHAR-2086] Kafka out...
Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/298#discussion_r66131266 --- Diff: kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java --- @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.kafka; + +import java.util.Properties; +import javax.validation.constraints.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import com.datatorrent.api.Context; +import com.datatorrent.api.Operator; + +public abstract class AbstractKafkaOutputOperatorimplements Operator +{ + @SuppressWarnings("unused") + private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaOutputOperator.class); + private transient Producer producer; // K is key partitioner, V is value type + @NotNull + private String topic = "topic1"; + private String producerProperties = ""; + private Properties configProperties = new Properties(); + public Properties getConfigProperties() --- End diff -- We don't have a constraint like this, but would be good if you could organize the property access method at one place. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2086) Kafka Output Operator with Kafka 0.9 API
[ https://issues.apache.org/jira/browse/APEXMALHAR-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319114#comment-15319114 ] ASF GitHub Bot commented on APEXMALHAR-2086: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/298#discussion_r66130894 --- Diff: kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java --- @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.kafka; + +import java.util.Properties; +import javax.validation.constraints.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import com.datatorrent.api.Context; +import com.datatorrent.api.Operator; + +public abstract class AbstractKafkaOutputOperatorimplements Operator +{ + @SuppressWarnings("unused") + private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaOutputOperator.class); + private transient Producer producer; // K is key partitioner, V is value type + @NotNull + private String topic = "topic1"; --- End diff -- Why is default value of "topic" property set to "topic1"... That invalidates the use of "@NotNull"... > Kafka Output Operator with Kafka 0.9 API > > > Key: APEXMALHAR-2086 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2086 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Sandesh >Assignee: Sandesh > > Goal : 2 Operartors for Kafka Output > 1. Simple Kafka Output Operator > - Supports Atleast Once > - Expose most used producer properties as class properties > 2. Exactly Once Kafka Output ( Not possible in all the cases, will be > documented later ) > > Design for Exactly Once > Window Data Manager - Stores the Kafka partitions offsets. > Kafka Key - Used by the operator = AppID#OperatorId > During recovery. Partially written window is re-created using the following > approach: > Tuples between the largest recovery offsets and the current offset are > checked. Based on the key, tuples written by the other entities are > discarded. > Only tuples which are not in the recovered set are emitted. > Tuples needs to be unique within the window. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #298: *For review only* [APEXMALHAR-2086] Kafka out...
Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/298#discussion_r66130894 --- Diff: kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java --- @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.kafka; + +import java.util.Properties; +import javax.validation.constraints.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import com.datatorrent.api.Context; +import com.datatorrent.api.Operator; + +public abstract class AbstractKafkaOutputOperatorimplements Operator +{ + @SuppressWarnings("unused") + private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaOutputOperator.class); + private transient Producer producer; // K is key partitioner, V is value type + @NotNull + private String topic = "topic1"; --- End diff -- Why is default value of "topic" property set to "topic1"... That invalidates the use of "@NotNull"... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2086) Kafka Output Operator with Kafka 0.9 API
[ https://issues.apache.org/jira/browse/APEXMALHAR-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319035#comment-15319035 ] ASF GitHub Bot commented on APEXMALHAR-2086: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/298#discussion_r66122516 --- Diff: kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java --- @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.kafka; + +import org.apache.kafka.clients.producer.ProducerRecord; +import com.datatorrent.api.DefaultInputPort; + +public class KafkaSinglePortOutputOperatorextends AbstractKafkaOutputOperator --- End diff -- Can you make this evolving? > Kafka Output Operator with Kafka 0.9 API > > > Key: APEXMALHAR-2086 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2086 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Sandesh >Assignee: Sandesh > > Goal : 2 Operartors for Kafka Output > 1. Simple Kafka Output Operator > - Supports Atleast Once > - Expose most used producer properties as class properties > 2. Exactly Once Kafka Output ( Not possible in all the cases, will be > documented later ) > > Design for Exactly Once > Window Data Manager - Stores the Kafka partitions offsets. > Kafka Key - Used by the operator = AppID#OperatorId > During recovery. Partially written window is re-created using the following > approach: > Tuples between the largest recovery offsets and the current offset are > checked. Based on the key, tuples written by the other entities are > discarded. > Only tuples which are not in the recovered set are emitted. > Tuples needs to be unique within the window. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-site pull request #36: Adding announcements and meetups to Apex front p...
Github user asfgit closed the pull request at: https://github.com/apache/apex-site/pull/36 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-site issue #36: Adding announcements and meetups to Apex front page
Github user tweise commented on the issue: https://github.com/apache/apex-site/pull/36 @sashadt Already live :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-site issue #36: Adding announcements and meetups to Apex front page
Github user sashadt commented on the issue: https://github.com/apache/apex-site/pull/36 @tweise Do you want to wait until more content to be added, or should we merge this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: Facing issue in merging PR
Thank you Thomas. For few committers the issue seems to have been resolved but I am still facing the issue while trying to commit. shubham@shubham:/code/projects/Apache/commit/apex-malhar$ git push origin master Username for 'https://git-wip-us.apache.org': shubham Password for 'https://shub...@git-wip-us.apache.org': Counting objects: 115, done. Delta compression using up to 8 threads. Compressing objects: 100% (23/23), done. Writing objects: 100% (33/33), 10.28 KiB | 0 bytes/s, done. Total 33 (delta 5), reused 0 (delta 0) remote: You are not authorized to edit this repository. remote: To https://git-wip-us.apache.org/repos/asf/apex-malhar.git ! [remote rejected] master -> master (pre-receive hook declined) error: failed to push some refs to ' https://git-wip-us.apache.org/repos/asf/apex-malhar.git' Am I missing any step here ? Thanks, Shubham On Mon, Jun 6, 2016 at 9:33 PM, Thomas Weisewrote: > Looks like the committer group isn't setup yet. It is tracked here: > > https://issues.apache.org/jira/browse/INFRA-11719 > > > > On Mon, Jun 6, 2016 at 4:36 AM, Shubham Pathak > wrote: > > > Hello, > > > > I am facing the same issue. > > > > shubham@shubham:/code/projects/Apache/commit/apex-malhar$ git remote -v > > origin https://git-wip-us.apache.org/repos/asf/apex-malhar.git (fetch) > > origin https://git-wip-us.apache.org/repos/asf/apex-malhar.git (push) > > shubham@shubham:/code/projects/Apache/commit/apex-malhar$ git branch > > * master > > shubham@shubham:/code/projects/Apache/commit/apex-malhar$ git push > origin > > master > > Username for 'https://git-wip-us.apache.org': shubham > > Password for 'https://shub...@git-wip-us.apache.org': > > Counting objects: 107, done. > > Delta compression using up to 8 threads. > > Compressing objects: 100% (15/15), done. > > Writing objects: 100% (25/25), 9.51 KiB | 0 bytes/s, done. > > Total 25 (delta 3), reused 0 (delta 0) > > remote: You are not authorized to edit this repository. > > remote: > > To https://git-wip-us.apache.org/repos/asf/apex-malhar.git > > ! [remote rejected] master -> master (pre-receive hook declined) > > error: failed to push some refs to ' > > https://git-wip-us.apache.org/repos/asf/apex-malhar.git' > > > > Thanks, > > Shubham > > > > On Mon, Jun 6, 2016 at 5:04 PM, Yogi Devendra < > > devendra.vyavah...@gmail.com> > > wrote: > > > > > Hi, > > > > > > I am merging PR for the first time. Getting following error. > > > > > > remote: You are not authorized to edit this repository. > > > > > > > > > Given below is the trace from console. Is it permission issue for my > > > account or permission issue with the new repo links? > > > > > > > > > > > > > > > ✔ /repo/apex-malhar [master|✔] > > > 16:01 $ git remote -v > > > apache https://git-wip-us.apache.org/repos/asf/apex-malhar.git/ > (fetch) > > > apache https://git-wip-us.apache.org/repos/asf/apex-malhar.git/ > (push) > > > origin g...@github.com:apache/apex-malhar.git (fetch) > > > origin g...@github.com:apache/apex-malhar.git (push) > > > upstreamg...@github.com:apache/apex-malhar.git (fetch) > > > upstreamg...@github.com:apache/apex-malhar.git (push) > > > ✔ /repo/apex-malhar [master|✔] > > > 16:01 $ git fetch apache > > > ✔ /repo/apex-malhar [master|✔] > > > 16:02 $ git reset --hard apache/master > > > HEAD is now at 3e1cd8f APEXMALHAR-2112 Add logging exclusions in geode > > > dependency for running of all other contrib tests with 1.7 Java. > > > ✔ /repo/apex-malhar [master|✔] > > > 16:03 $ git pull https://github.com/tweise/incubator-apex-malhar > master > > > From https://github.com/tweise/incubator-apex-malhar > > > * branchmaster -> FETCH_HEAD > > > Updating 3e1cd8f..18588f3 > > > Fast-forward > > > > > > > > > library/src/main/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperator.java > > > | 21 +++-- > > > 1 file changed, 11 insertions(+), 10 deletions(-) > > > ✔ /repo/apex-malhar [master ↑·1|✔] > > > 16:03 $ git push apache master > > > Username for 'https://git-wip-us.apache.org': yogidevendra > > > Password for 'https://yogideven...@git-wip-us.apache.org': > > > Counting objects: 62, done. > > > Delta compression using up to 8 threads. > > > Compressing objects: 100% (10/10), done. > > > Writing objects: 100% (13/13), 1.10 KiB | 0 bytes/s, done. > > > Total 13 (delta 4), reused 0 (delta 0) > > > remote: You are not authorized to edit this repository. > > > remote: > > > To https://git-wip-us.apache.org/repos/asf/apex-malhar.git/ > > > ! [remote rejected] master -> master (pre-receive hook declined) > > > error: failed to push some refs to > > > 'https://git-wip-us.apache.org/repos/asf/apex-malhar.git/' > > > ✘-1 /repo/apex-malhar [master ↑·1|✔] > > > 16:04 $ git checkout -b master-try2 > > > Switched to a new branch 'master-try2' > > > ✔