[jira] [Commented] (APEXMALHAR-2086) Kafka Output Operator with Kafka 0.9 API

2016-06-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345819#comment-15345819
 ] 

ASF GitHub Bot commented on APEXMALHAR-2086:


Github user siyuanh commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/298#discussion_r68179198
  
--- Diff: 
kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
 ---
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+/**
+ * Kafka output operator with exactly once processing semantics under 
certain conditions.,
+ *
+ *  This operator uses *Key* to distinguish the messages written by 
particular instance of the Output operator.
+ *  Operator users can only use *value* for storing the data.
+ *
+ * @displayName Single Port Exactly Once Kafka Output(0.9.0)
+ * @category Messaging
+ * @tags output operator
+ *
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class KafkaSinglePortExactlyOnceOutputOperator extends 
AbstractKafkaOutputOperator
+implements Operator.CheckpointNotificationListener
+{
+  private transient String key;
+  private transient String appName;
+  private transient Integer operatorId;
+  private transient Long windowId;
+  private transient Map partialWindowTuples = new HashMap<>();
+  private transient KafkaConsumer consumer;
+
+  private WindowDataManager windowDataManager = new FSWindowDataManager();
+  private final int KAFKA_CONNECT_ATTEMPT = 10;
+  private final String KEY_SEPARATOR = "#";
+  private final String KEY_SERIALIZER = 
"org.apache.kafka.common.serialization.StringDeserializer";
+  private final String VALUE_SERIALIZER = 
"org.apache.kafka.common.serialization.StringDeserializer";
+
+  public final transient DefaultInputPort inputPort = new 
DefaultInputPort()
+  {
+@Override
+public void process(T tuple)
+{
+  sendTuple(tuple);
+}
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+super.setup(context);
+
+this.operatorId = context.getId();
+this.windowDataManager.setup(context);
+this.appName = context.getValue(Context.DAGContext.APPLICATION_NAME);
+this.key = appName + KEY_SEPARATOR + (new Integer(operatorId));
+this.consumer = KafkaConsumerInit();
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+this.windowId = windowId;
+
+if (windowId == windowDataManager.getLargestRecoveryWindow()) {
+  rebuildPartialWindow();
+}

[GitHub] apex-malhar pull request #298: [APEXMALHAR-2086] Kafka output: 0.9.1 first c...

2016-06-22 Thread siyuanh
Github user siyuanh commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/298#discussion_r68179198
  
--- Diff: 
kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
 ---
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+/**
+ * Kafka output operator with exactly once processing semantics under 
certain conditions.,
+ *
+ *  This operator uses *Key* to distinguish the messages written by 
particular instance of the Output operator.
+ *  Operator users can only use *value* for storing the data.
+ *
+ * @displayName Single Port Exactly Once Kafka Output(0.9.0)
+ * @category Messaging
+ * @tags output operator
+ *
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class KafkaSinglePortExactlyOnceOutputOperator extends 
AbstractKafkaOutputOperator
+implements Operator.CheckpointNotificationListener
+{
+  private transient String key;
+  private transient String appName;
+  private transient Integer operatorId;
+  private transient Long windowId;
+  private transient Map partialWindowTuples = new HashMap<>();
+  private transient KafkaConsumer consumer;
+
+  private WindowDataManager windowDataManager = new FSWindowDataManager();
+  private final int KAFKA_CONNECT_ATTEMPT = 10;
+  private final String KEY_SEPARATOR = "#";
+  private final String KEY_SERIALIZER = 
"org.apache.kafka.common.serialization.StringDeserializer";
+  private final String VALUE_SERIALIZER = 
"org.apache.kafka.common.serialization.StringDeserializer";
+
+  public final transient DefaultInputPort inputPort = new 
DefaultInputPort()
+  {
+@Override
+public void process(T tuple)
+{
+  sendTuple(tuple);
+}
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+super.setup(context);
+
+this.operatorId = context.getId();
+this.windowDataManager.setup(context);
+this.appName = context.getValue(Context.DAGContext.APPLICATION_NAME);
+this.key = appName + KEY_SEPARATOR + (new Integer(operatorId));
+this.consumer = KafkaConsumerInit();
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+this.windowId = windowId;
+
+if (windowId == windowDataManager.getLargestRecoveryWindow()) {
+  rebuildPartialWindow();
+}
+  }
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+try {
+  windowDataManager.deleteUpTo(operatorId, windowId);
+   

[jira] [Commented] (APEXMALHAR-2086) Kafka Output Operator with Kafka 0.9 API

2016-06-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345817#comment-15345817
 ] 

ASF GitHub Bot commented on APEXMALHAR-2086:


Github user siyuanh commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/298#discussion_r68179058
  
--- Diff: 
kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
 ---
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+/**
+ * Kafka output operator with exactly once processing semantics under 
certain conditions.,
+ *
+ *  This operator uses *Key* to distinguish the messages written by 
particular instance of the Output operator.
+ *  Operator users can only use *value* for storing the data.
+ *
+ * @displayName Single Port Exactly Once Kafka Output(0.9.0)
+ * @category Messaging
+ * @tags output operator
+ *
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class KafkaSinglePortExactlyOnceOutputOperator extends 
AbstractKafkaOutputOperator
+implements Operator.CheckpointNotificationListener
+{
+  private transient String key;
+  private transient String appName;
+  private transient Integer operatorId;
+  private transient Long windowId;
+  private transient Map partialWindowTuples = new HashMap<>();
+  private transient KafkaConsumer consumer;
+
+  private WindowDataManager windowDataManager = new FSWindowDataManager();
+  private final int KAFKA_CONNECT_ATTEMPT = 10;
+  private final String KEY_SEPARATOR = "#";
+  private final String KEY_SERIALIZER = 
"org.apache.kafka.common.serialization.StringDeserializer";
+  private final String VALUE_SERIALIZER = 
"org.apache.kafka.common.serialization.StringDeserializer";
+
+  public final transient DefaultInputPort inputPort = new 
DefaultInputPort()
+  {
+@Override
+public void process(T tuple)
+{
+  sendTuple(tuple);
+}
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+super.setup(context);
+
+this.operatorId = context.getId();
+this.windowDataManager.setup(context);
+this.appName = context.getValue(Context.DAGContext.APPLICATION_NAME);
+this.key = appName + KEY_SEPARATOR + (new Integer(operatorId));
+this.consumer = KafkaConsumerInit();
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+this.windowId = windowId;
+
+if (windowId == windowDataManager.getLargestRecoveryWindow()) {
+  rebuildPartialWindow();
+}

[GitHub] apex-malhar pull request #298: [APEXMALHAR-2086] Kafka output: 0.9.1 first c...

2016-06-22 Thread siyuanh
Github user siyuanh commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/298#discussion_r68179058
  
--- Diff: 
kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
 ---
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+/**
+ * Kafka output operator with exactly once processing semantics under 
certain conditions.,
+ *
+ *  This operator uses *Key* to distinguish the messages written by 
particular instance of the Output operator.
+ *  Operator users can only use *value* for storing the data.
+ *
+ * @displayName Single Port Exactly Once Kafka Output(0.9.0)
+ * @category Messaging
+ * @tags output operator
+ *
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class KafkaSinglePortExactlyOnceOutputOperator extends 
AbstractKafkaOutputOperator
+implements Operator.CheckpointNotificationListener
+{
+  private transient String key;
+  private transient String appName;
+  private transient Integer operatorId;
+  private transient Long windowId;
+  private transient Map partialWindowTuples = new HashMap<>();
+  private transient KafkaConsumer consumer;
+
+  private WindowDataManager windowDataManager = new FSWindowDataManager();
+  private final int KAFKA_CONNECT_ATTEMPT = 10;
+  private final String KEY_SEPARATOR = "#";
+  private final String KEY_SERIALIZER = 
"org.apache.kafka.common.serialization.StringDeserializer";
+  private final String VALUE_SERIALIZER = 
"org.apache.kafka.common.serialization.StringDeserializer";
+
+  public final transient DefaultInputPort inputPort = new 
DefaultInputPort()
+  {
+@Override
+public void process(T tuple)
+{
+  sendTuple(tuple);
+}
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+super.setup(context);
+
+this.operatorId = context.getId();
+this.windowDataManager.setup(context);
+this.appName = context.getValue(Context.DAGContext.APPLICATION_NAME);
+this.key = appName + KEY_SEPARATOR + (new Integer(operatorId));
+this.consumer = KafkaConsumerInit();
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+this.windowId = windowId;
+
+if (windowId == windowDataManager.getLargestRecoveryWindow()) {
+  rebuildPartialWindow();
+}
+  }
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+try {
+  windowDataManager.deleteUpTo(operatorId, windowId);
+   

[GitHub] apex-malhar pull request #298: [APEXMALHAR-2086] Kafka output: 0.9.1 first c...

2016-06-22 Thread siyuanh
Github user siyuanh commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/298#discussion_r68178863
  
--- Diff: 
kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
 ---
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+/**
+ * Kafka output operator with exactly once processing semantics under 
certain conditions.,
+ *
+ *  This operator uses *Key* to distinguish the messages written by 
particular instance of the Output operator.
+ *  Operator users can only use *value* for storing the data.
+ *
+ * @displayName Single Port Exactly Once Kafka Output(0.9.0)
+ * @category Messaging
+ * @tags output operator
+ *
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class KafkaSinglePortExactlyOnceOutputOperator extends 
AbstractKafkaOutputOperator
+implements Operator.CheckpointNotificationListener
+{
+  private transient String key;
+  private transient String appName;
+  private transient Integer operatorId;
+  private transient Long windowId;
+  private transient Map partialWindowTuples = new HashMap<>();
+  private transient KafkaConsumer consumer;
+
+  private WindowDataManager windowDataManager = new FSWindowDataManager();
+  private final int KAFKA_CONNECT_ATTEMPT = 10;
+  private final String KEY_SEPARATOR = "#";
+  private final String KEY_SERIALIZER = 
"org.apache.kafka.common.serialization.StringDeserializer";
+  private final String VALUE_SERIALIZER = 
"org.apache.kafka.common.serialization.StringDeserializer";
+
+  public final transient DefaultInputPort inputPort = new 
DefaultInputPort()
+  {
+@Override
+public void process(T tuple)
+{
+  sendTuple(tuple);
+}
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+super.setup(context);
+
+this.operatorId = context.getId();
+this.windowDataManager.setup(context);
+this.appName = context.getValue(Context.DAGContext.APPLICATION_NAME);
+this.key = appName + KEY_SEPARATOR + (new Integer(operatorId));
+this.consumer = KafkaConsumerInit();
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+this.windowId = windowId;
+
+if (windowId == windowDataManager.getLargestRecoveryWindow()) {
+  rebuildPartialWindow();
+}
+  }
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+try {
+  windowDataManager.deleteUpTo(operatorId, windowId);
+   

[jira] [Commented] (APEXMALHAR-1957) Improve HBasePOJOInputOperator with support for threaded read

2016-06-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345773#comment-15345773
 ] 

ASF GitHub Bot commented on APEXMALHAR-1957:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/292#discussion_r68177487
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java 
---
@@ -37,20 +37,21 @@
 import com.datatorrent.api.Context.OperatorContext;
 
 /**
+ * HBasePOJOInputOperator reads data from a HBase store, converts it to a 
POJO and puts it on the output port.
+ * The read from HBase is asynchronous.
  * @displayName HBase Input Operator
  * @category Input
  * @tags database, nosql, pojo, hbase
  * @since 3.1.0
  */
 @Evolving
-public class HBasePOJOInputOperator extends HBaseInputOperator
+public class HBasePOJOInputOperator extends HBaseScanOperator
 {
   private TableInfo tableInfo;
-  protected HBaseStore store;
   private String pojoTypeName;
-  private String startRow;
-  private String lastReadRow;
+  private Scan scan;
 
+  // Transients
   protected transient Class pojoType;
--- End diff --

Did not change this as not related to the current PR.


> Improve HBasePOJOInputOperator with support for threaded read
> -
>
> Key: APEXMALHAR-1957
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1957
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Bhupesh Chawda
>Assignee: Bhupesh Chawda
>
> Add the following support to Hbase POJO Input Operator:
> * Add support for threaded read
> * Allow to specify a set of "column family: column" and fetch data only for 
> these columns
> * Allow to specify an end row key to stop scanning
> * Add metrics



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #292: APEXMALHAR-1957: Updated HBase Input Operator

2016-06-22 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/292#discussion_r68177487
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java 
---
@@ -37,20 +37,21 @@
 import com.datatorrent.api.Context.OperatorContext;
 
 /**
+ * HBasePOJOInputOperator reads data from a HBase store, converts it to a 
POJO and puts it on the output port.
+ * The read from HBase is asynchronous.
  * @displayName HBase Input Operator
  * @category Input
  * @tags database, nosql, pojo, hbase
  * @since 3.1.0
  */
 @Evolving
-public class HBasePOJOInputOperator extends HBaseInputOperator
+public class HBasePOJOInputOperator extends HBaseScanOperator
 {
   private TableInfo tableInfo;
-  protected HBaseStore store;
   private String pojoTypeName;
-  private String startRow;
-  private String lastReadRow;
+  private Scan scan;
 
+  // Transients
   protected transient Class pojoType;
--- End diff --

Did not change this as not related to the current PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-1957) Improve HBasePOJOInputOperator with support for threaded read

2016-06-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345770#comment-15345770
 ] 

ASF GitHub Bot commented on APEXMALHAR-1957:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/292#discussion_r68177411
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java 
---
@@ -69,116 +70,105 @@ public Object convertValue( HBaseFieldInfo fieldInfo, 
Object value)
   public void setup(OperatorContext context)
   {
 try {
-  store.connect();
+  super.setup(context);
   pojoType = Class.forName(pojoTypeName);
   pojoType.newInstance();   //try create new instance to verify the 
class.
   rowSetter = PojoUtils.createSetter(pojoType, 
tableInfo.getRowOrIdExpression(), String.class);
-  fieldValueGenerator = 
FieldValueGenerator.getFieldValueGenerator(pojoType, tableInfo.getFieldsInfo() 
);
+  fieldValueGenerator = 
HBaseFieldValueGenerator.getHBaseFieldValueGenerator(pojoType, 
tableInfo.getFieldsInfo() );
--- End diff --

Done


> Improve HBasePOJOInputOperator with support for threaded read
> -
>
> Key: APEXMALHAR-1957
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1957
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Bhupesh Chawda
>Assignee: Bhupesh Chawda
>
> Add the following support to Hbase POJO Input Operator:
> * Add support for threaded read
> * Allow to specify a set of "column family: column" and fetch data only for 
> these columns
> * Allow to specify an end row key to stop scanning
> * Add metrics



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-1957) Improve HBasePOJOInputOperator with support for threaded read

2016-06-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345767#comment-15345767
 ] 

ASF GitHub Bot commented on APEXMALHAR-1957:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/292#discussion_r68177086
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseScanOperator.java ---
@@ -79,4 +170,79 @@ public void emitTuples()
*/
   protected abstract T getTuple(Result result);
 
+  /**
+   * Returns the start row key in the table as set previously
+   * @return {@link #startRow}
+   */
+  public String getStartRow()
+  {
+return startRow;
+  }
+
+  /**
+   * Sets the start row key in the table from where the scan should begin
+   * @param startRow
+   */
+  public void setStartRow(String startRow)
+  {
+this.startRow = startRow;
+  }
+
+  /**
+   * Returns the end row key in the table as set previously
+   * @return {@link #endRow}
+   */
+  public String getEndRow()
+  {
+return endRow;
+  }
+
+  /**
+   * Sets the end row key in the table where the scan should end
+   * @param endRow
+   */
+  public void setEndRow(String endRow)
+  {
+this.endRow = endRow;
+  }
+
+  /**
+   * Returns the last read row key from the hbase table
+   * @return {@link #lastReadRow}
+   */
+  public String getLastReadRow()
+  {
+return lastReadRow;
+  }
+
+  /**
+   * Sets the last read row key from the hbase table. After the failures, 
the new scan will start from this row key
+   * @param lastReadRow
+   */
+  public void setLastReadRow(String lastReadRow)
+  {
+this.lastReadRow = lastReadRow;
+  }
+
+  /**
+   * Returns the Scan HINT_LOOKAHEAD parameter as configured. Default is 
{@value #DEF_HINT_SCAN_LOOKAHEAD}
--- End diff --

It seems this is now deprecated and optimization will happen automatically. 
Will remove this from the changes.


> Improve HBasePOJOInputOperator with support for threaded read
> -
>
> Key: APEXMALHAR-1957
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1957
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Bhupesh Chawda
>Assignee: Bhupesh Chawda
>
> Add the following support to Hbase POJO Input Operator:
> * Add support for threaded read
> * Allow to specify a set of "column family: column" and fetch data only for 
> these columns
> * Allow to specify an end row key to stop scanning
> * Add metrics



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #292: APEXMALHAR-1957: Updated HBase Input Operator

2016-06-22 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/292#discussion_r68177086
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseScanOperator.java ---
@@ -79,4 +170,79 @@ public void emitTuples()
*/
   protected abstract T getTuple(Result result);
 
+  /**
+   * Returns the start row key in the table as set previously
+   * @return {@link #startRow}
+   */
+  public String getStartRow()
+  {
+return startRow;
+  }
+
+  /**
+   * Sets the start row key in the table from where the scan should begin
+   * @param startRow
+   */
+  public void setStartRow(String startRow)
+  {
+this.startRow = startRow;
+  }
+
+  /**
+   * Returns the end row key in the table as set previously
+   * @return {@link #endRow}
+   */
+  public String getEndRow()
+  {
+return endRow;
+  }
+
+  /**
+   * Sets the end row key in the table where the scan should end
+   * @param endRow
+   */
+  public void setEndRow(String endRow)
+  {
+this.endRow = endRow;
+  }
+
+  /**
+   * Returns the last read row key from the hbase table
+   * @return {@link #lastReadRow}
+   */
+  public String getLastReadRow()
+  {
+return lastReadRow;
+  }
+
+  /**
+   * Sets the last read row key from the hbase table. After the failures, 
the new scan will start from this row key
+   * @param lastReadRow
+   */
+  public void setLastReadRow(String lastReadRow)
+  {
+this.lastReadRow = lastReadRow;
+  }
+
+  /**
+   * Returns the Scan HINT_LOOKAHEAD parameter as configured. Default is 
{@value #DEF_HINT_SCAN_LOOKAHEAD}
--- End diff --

It seems this is now deprecated and optimization will happen automatically. 
Will remove this from the changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #292: APEXMALHAR-1957: Updated HBase Input Operator

2016-06-22 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/292#discussion_r68172645
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java 
---
@@ -69,116 +70,105 @@ public Object convertValue( HBaseFieldInfo fieldInfo, 
Object value)
   public void setup(OperatorContext context)
   {
 try {
-  store.connect();
+  super.setup(context);
   pojoType = Class.forName(pojoTypeName);
   pojoType.newInstance();   //try create new instance to verify the 
class.
   rowSetter = PojoUtils.createSetter(pojoType, 
tableInfo.getRowOrIdExpression(), String.class);
-  fieldValueGenerator = 
FieldValueGenerator.getFieldValueGenerator(pojoType, tableInfo.getFieldsInfo() 
);
+  fieldValueGenerator = 
HBaseFieldValueGenerator.getHBaseFieldValueGenerator(pojoType, 
tableInfo.getFieldsInfo() );
   valueConverter = new BytesValueConverter();
+  scan = new Scan();
 } catch (Exception ex) {
   throw new RuntimeException(ex);
 }
   }
 
   @Override
-  public void beginWindow(long windowId)
-  {
-  }
-
-  @Override
-  public void teardown()
-  {
-try {
-  store.disconnect();
-} catch (IOException ex) {
-  throw new RuntimeException(ex);
-}
-  }
-
-  @Override
-  public void emitTuples()
+  protected Object getTuple(Result result)
   {
 try {
-  Scan scan = nextScan();
-  if (scan == null)
-return;
-
-  ResultScanner resultScanner = store.getTable().getScanner(scan);
-
-  while (true) {
-Result result = resultScanner.next();
-if (result == null)
-  break;
-
-String readRow = Bytes.toString(result.getRow());
-if( readRow.equals( lastReadRow ))
-  continue;
-
-Object instance = pojoType.newInstance();
-rowSetter.set(instance, readRow);
-
-List cells = result.listCells();
+  String readRow = Bytes.toString(result.getRow());
+  if( readRow.equals( getLastReadRow() )) {
--- End diff --

This will need changes to the signature of getTuple. Here we are assuming 
that getTuple will extract the Hbase row and then convert it to the output 
type. However, the lastReadRow is stored in the raw form. In order to move this 
to base class, we'll have to map the tuple to a raw type and then check if the 
record is the one we saw last.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-1957) Improve HBasePOJOInputOperator with support for threaded read

2016-06-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345684#comment-15345684
 ] 

ASF GitHub Bot commented on APEXMALHAR-1957:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/292#discussion_r68172645
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java 
---
@@ -69,116 +70,105 @@ public Object convertValue( HBaseFieldInfo fieldInfo, 
Object value)
   public void setup(OperatorContext context)
   {
 try {
-  store.connect();
+  super.setup(context);
   pojoType = Class.forName(pojoTypeName);
   pojoType.newInstance();   //try create new instance to verify the 
class.
   rowSetter = PojoUtils.createSetter(pojoType, 
tableInfo.getRowOrIdExpression(), String.class);
-  fieldValueGenerator = 
FieldValueGenerator.getFieldValueGenerator(pojoType, tableInfo.getFieldsInfo() 
);
+  fieldValueGenerator = 
HBaseFieldValueGenerator.getHBaseFieldValueGenerator(pojoType, 
tableInfo.getFieldsInfo() );
   valueConverter = new BytesValueConverter();
+  scan = new Scan();
 } catch (Exception ex) {
   throw new RuntimeException(ex);
 }
   }
 
   @Override
-  public void beginWindow(long windowId)
-  {
-  }
-
-  @Override
-  public void teardown()
-  {
-try {
-  store.disconnect();
-} catch (IOException ex) {
-  throw new RuntimeException(ex);
-}
-  }
-
-  @Override
-  public void emitTuples()
+  protected Object getTuple(Result result)
   {
 try {
-  Scan scan = nextScan();
-  if (scan == null)
-return;
-
-  ResultScanner resultScanner = store.getTable().getScanner(scan);
-
-  while (true) {
-Result result = resultScanner.next();
-if (result == null)
-  break;
-
-String readRow = Bytes.toString(result.getRow());
-if( readRow.equals( lastReadRow ))
-  continue;
-
-Object instance = pojoType.newInstance();
-rowSetter.set(instance, readRow);
-
-List cells = result.listCells();
+  String readRow = Bytes.toString(result.getRow());
+  if( readRow.equals( getLastReadRow() )) {
--- End diff --

This will need changes to the signature of getTuple. Here we are assuming 
that getTuple will extract the Hbase row and then convert it to the output 
type. However, the lastReadRow is stored in the raw form. In order to move this 
to base class, we'll have to map the tuple to a raw type and then check if the 
record is the one we saw last.


> Improve HBasePOJOInputOperator with support for threaded read
> -
>
> Key: APEXMALHAR-1957
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1957
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Bhupesh Chawda
>Assignee: Bhupesh Chawda
>
> Add the following support to Hbase POJO Input Operator:
> * Add support for threaded read
> * Allow to specify a set of "column family: column" and fetch data only for 
> these columns
> * Allow to specify an end row key to stop scanning
> * Add metrics



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXCORE-222) Delegate Buffer Server purge to StreamingContainer

2016-06-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345591#comment-15345591
 ] 

ASF GitHub Bot commented on APEXCORE-222:
-

Github user vrozov commented on a diff in the pull request:

https://github.com/apache/apex-core/pull/350#discussion_r68166717
  
--- Diff: 
engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---
@@ -769,7 +769,11 @@ public void 
processHeartbeatResponse(ContainerHeartbeatResponse rsp)
 }
 
 if (rsp.committedWindowId != lastCommittedWindowId) {
+
   lastCommittedWindowId = rsp.committedWindowId;
+
+  bufferServer.purge(lastCommittedWindowId);
--- End diff --

In StreamingContainerManager windowId to purge is decremented by 1. Please 
check if the same decrement needs to be applied here?


> Delegate Buffer Server purge to StreamingContainer
> --
>
> Key: APEXCORE-222
> URL: https://issues.apache.org/jira/browse/APEXCORE-222
> Project: Apache Apex Core
>  Issue Type: Improvement
>Reporter: Thomas Weise
>Assignee: Sandesh
>
> Currently the purge requests are sent to the buffer servers from the app 
> master. This interaction exists parallel to the heartbeat protocol. Instead, 
> the committed window ID that is propagated through the heartbeat response can 
> be used in StreamingContainer to initiate the purge with the local buffer 
> server, similar to how the committed callback on the operator occurs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-core pull request #350: APEXCORE-222 Purging the stale data present in ...

2016-06-22 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/apex-core/pull/350#discussion_r68166717
  
--- Diff: 
engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---
@@ -769,7 +769,11 @@ public void 
processHeartbeatResponse(ContainerHeartbeatResponse rsp)
 }
 
 if (rsp.committedWindowId != lastCommittedWindowId) {
+
   lastCommittedWindowId = rsp.committedWindowId;
+
+  bufferServer.purge(lastCommittedWindowId);
--- End diff --

In StreamingContainerManager windowId to purge is decremented by 1. Please 
check if the same decrement needs to be applied here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXCORE-222) Delegate Buffer Server purge to StreamingContainer

2016-06-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345573#comment-15345573
 ] 

ASF GitHub Bot commented on APEXCORE-222:
-

Github user vrozov commented on a diff in the pull request:

https://github.com/apache/apex-core/pull/350#discussion_r68164956
  
--- Diff: 
bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java 
---
@@ -178,19 +178,23 @@ public void reset()
 
   public void purge(final int baseSeconds, final int windowId)
--- End diff --

Remove `purge(int, int)` API (convert it to `purge(long)`).


> Delegate Buffer Server purge to StreamingContainer
> --
>
> Key: APEXCORE-222
> URL: https://issues.apache.org/jira/browse/APEXCORE-222
> Project: Apache Apex Core
>  Issue Type: Improvement
>Reporter: Thomas Weise
>Assignee: Sandesh
>
> Currently the purge requests are sent to the buffer servers from the app 
> master. This interaction exists parallel to the heartbeat protocol. Instead, 
> the committed window ID that is propagated through the heartbeat response can 
> be used in StreamingContainer to initiate the purge with the local buffer 
> server, similar to how the committed callback on the operator occurs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-core pull request #350: APEXCORE-222 Purging the stale data present in ...

2016-06-22 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/apex-core/pull/350#discussion_r68164956
  
--- Diff: 
bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java 
---
@@ -178,19 +178,23 @@ public void reset()
 
   public void purge(final int baseSeconds, final int windowId)
--- End diff --

Remove `purge(int, int)` API (convert it to `purge(long)`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXCORE-222) Delegate Buffer Server purge to StreamingContainer

2016-06-22 Thread Thomas Weise (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345565#comment-15345565
 ] 

Thomas Weise commented on APEXCORE-222:
---

You need to analyze the exact sequence of events during recovery. As I recall 
there is a scenario where the buffer needs to be truncated prior to operator 
deployment.

> Delegate Buffer Server purge to StreamingContainer
> --
>
> Key: APEXCORE-222
> URL: https://issues.apache.org/jira/browse/APEXCORE-222
> Project: Apache Apex Core
>  Issue Type: Improvement
>Reporter: Thomas Weise
>Assignee: Sandesh
>
> Currently the purge requests are sent to the buffer servers from the app 
> master. This interaction exists parallel to the heartbeat protocol. Instead, 
> the committed window ID that is propagated through the heartbeat response can 
> be used in StreamingContainer to initiate the purge with the local buffer 
> server, similar to how the committed callback on the operator occurs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-core pull request #350: APEXCORE-222 Purging the stale data present in ...

2016-06-22 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/apex-core/pull/350#discussion_r68163971
  
--- Diff: 
engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---
@@ -769,7 +769,11 @@ public void 
processHeartbeatResponse(ContainerHeartbeatResponse rsp)
 }
 
 if (rsp.committedWindowId != lastCommittedWindowId) {
+
   lastCommittedWindowId = rsp.committedWindowId;
+
+  bufferServer.purge(lastCommittedWindowId);
--- End diff --

`bufferserver.purge()` is subject to NPE.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXCORE-222) Delegate Buffer Server purge to StreamingContainer

2016-06-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1534#comment-1534
 ] 

ASF GitHub Bot commented on APEXCORE-222:
-

Github user vrozov commented on a diff in the pull request:

https://github.com/apache/apex-core/pull/350#discussion_r68163971
  
--- Diff: 
engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---
@@ -769,7 +769,11 @@ public void 
processHeartbeatResponse(ContainerHeartbeatResponse rsp)
 }
 
 if (rsp.committedWindowId != lastCommittedWindowId) {
+
   lastCommittedWindowId = rsp.committedWindowId;
+
+  bufferServer.purge(lastCommittedWindowId);
--- End diff --

`bufferserver.purge()` is subject to NPE.


> Delegate Buffer Server purge to StreamingContainer
> --
>
> Key: APEXCORE-222
> URL: https://issues.apache.org/jira/browse/APEXCORE-222
> Project: Apache Apex Core
>  Issue Type: Improvement
>Reporter: Thomas Weise
>Assignee: Sandesh
>
> Currently the purge requests are sent to the buffer servers from the app 
> master. This interaction exists parallel to the heartbeat protocol. Instead, 
> the committed window ID that is propagated through the heartbeat response can 
> be used in StreamingContainer to initiate the purge with the local buffer 
> server, similar to how the committed callback on the operator occurs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2045) Add bandwidth control feature to Apex

2016-06-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345545#comment-15345545
 ] 

ASF GitHub Bot commented on APEXMALHAR-2045:


Github user vrozov commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/279#discussion_r68163461
  
--- Diff: 
library/src/test/java/com/datatorrent/lib/bandwidth/BandwidthManagerTest.java 
---
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.lib.bandwidth;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.apache.commons.io.FileUtils;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+
+public class BandwidthManagerTest
+{
+  private static class TestMeta extends TestWatcher
+  {
+private String applicationPath;
+private BandwidthManager underTest;
+private Context.OperatorContext context;
+private long bandwidthLimit = 10L;
+private ScheduledExecutorTestService mockschedular;
--- End diff --

Consider making ScheduledExecutorTestService private or better use 
anonymous ScheduledExecutorService interface implementation.


> Add bandwidth control feature to Apex
> -
>
> Key: APEXMALHAR-2045
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2045
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
>
>  bandwidth restrictions on input operator for number of bytes to be consumed 
> per second.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2045) Add bandwidth control feature to Apex

2016-06-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345519#comment-15345519
 ] 

ASF GitHub Bot commented on APEXMALHAR-2045:


Github user vrozov commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/279#discussion_r68161618
  
--- Diff: 
library/src/test/java/com/datatorrent/lib/bandwidth/BandwidthManagerTest.java 
---
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.lib.bandwidth;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.apache.commons.io.FileUtils;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+
+public class BandwidthManagerTest
+{
+  private static class TestMeta extends TestWatcher
+  {
+private String applicationPath;
+private BandwidthManager underTest;
+private Context.OperatorContext context;
+private long bandwidthLimit = 10L;
+private ScheduledExecutorTestService mockschedular;
+
+@Override
+protected void starting(Description description)
+{
+  super.starting(description);
+  mockschedular = new ScheduledExecutorTestService();
+  underTest = new BandwidthManager(mockschedular);
+  underTest.setBandwidth(bandwidthLimit);
+
+  applicationPath = "target/" + description.getClassName() + "/" + 
description.getMethodName();
+  Attribute.AttributeMap.DefaultAttributeMap attributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
+  attributes.put(DAG.APPLICATION_PATH, applicationPath);
+  context = new OperatorContextTestHelper.TestIdOperatorContext(1, 
attributes);
+  underTest.setup(context);
+}
+
+@Override
+protected void finished(Description description)
+{
+  underTest.teardown();
+  try {
+FileUtils.deleteDirectory(new File("target/" + 
description.getClassName()));
+  } catch (IOException e) {
+throw new RuntimeException(e);
+  }
+}
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testBandwidthForLargeBlocks() throws InterruptedException
+  {
+String data = "Tuple: test data to be emitted.";
+long startTime = System.currentTimeMillis();
+testMeta.underTest.consumeBandwidth(data.length());
+while (!testMeta.underTest.canConsumeBandwidth()) {
+  Thread.sleep(1000);
--- End diff --

waiting for a second is not necessary here. Can it be replaced with a count 
instead?


> Add bandwidth control feature to Apex
> -
>
> Key: APEXMALHAR-2045
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2045
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
>
>  bandwidth restrictions on input operator for number of bytes to be consumed 
> per second.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2048) Create concrete implementation of ArrayListMultiMap using managed state.

2016-06-22 Thread Thomas Weise (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345458#comment-15345458
 ] 

Thomas Weise commented on APEXMALHAR-2048:
--

APEXMALHAR-2085 should use this for window state management.

> Create concrete implementation of ArrayListMultiMap using managed state.
> 
>
> Key: APEXMALHAR-2048
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2048
> Project: Apache Apex Malhar
>  Issue Type: Sub-task
>Reporter: Timothy Farkas
>Assignee: Timothy Farkas
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2045) Add bandwidth control feature to Apex

2016-06-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345341#comment-15345341
 ] 

ASF GitHub Bot commented on APEXMALHAR-2045:


Github user vrozov commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/279#discussion_r68150902
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthManager.java ---
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.bandwidth;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+
+/**
+ * BandwidthManager keeps track of bandwidth consumption and provides 
limit on maximum bandwidth that can be consumed at
+ * any moment. This accumulates bandwidth upto certain limits so that 
accumulated bandwidth can be used over a period of
+ * time.
+ */
+public class BandwidthManager implements Component
+{
+  private static final Logger LOG = 
LoggerFactory.getLogger(BandwidthManager.class);
+  /**
+   * Maximum bandwidth that can be consumed in bytes/sec
+   */
+  private long bandwidthLimit = Long.MAX_VALUE;
+  private transient long currentBandwidthConsumption;
+  private final transient ScheduledExecutorService scheduler;
+  private final transient Object lock = new Object();
+
+  public BandwidthManager()
+  {
+scheduler = Executors.newScheduledThreadPool(1);
+  }
+
+  BandwidthManager(ScheduledExecutorService scheduler)
+  {
+this.scheduler = scheduler;
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+scheduler.scheduleAtFixedRate(new BandwidthAccumulator(), 1, 1, 
TimeUnit.SECONDS);
+  }
+
+  public boolean canConsumeBandwidth()
+  {
+if (!isBandwidthRestricted()) {
+  return true;
+}
+synchronized (lock) {
+  if (currentBandwidthConsumption >= 0) {
--- End diff --

`return currentBandwidthConsumption >= 0`?


> Add bandwidth control feature to Apex
> -
>
> Key: APEXMALHAR-2045
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2045
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
>
>  bandwidth restrictions on input operator for number of bytes to be consumed 
> per second.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #279: APEXMALHAR-2045: Adding bandwidth control fea...

2016-06-22 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/279#discussion_r68151025
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthManager.java ---
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.bandwidth;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+
+/**
+ * BandwidthManager keeps track of bandwidth consumption and provides 
limit on maximum bandwidth that can be consumed at
+ * any moment. This accumulates bandwidth upto certain limits so that 
accumulated bandwidth can be used over a period of
+ * time.
+ */
+public class BandwidthManager implements Component
+{
+  private static final Logger LOG = 
LoggerFactory.getLogger(BandwidthManager.class);
+  /**
+   * Maximum bandwidth that can be consumed in bytes/sec
+   */
+  private long bandwidthLimit = Long.MAX_VALUE;
+  private transient long currentBandwidthConsumption;
+  private final transient ScheduledExecutorService scheduler;
+  private final transient Object lock = new Object();
+
+  public BandwidthManager()
+  {
+scheduler = Executors.newScheduledThreadPool(1);
+  }
+
+  BandwidthManager(ScheduledExecutorService scheduler)
+  {
+this.scheduler = scheduler;
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+scheduler.scheduleAtFixedRate(new BandwidthAccumulator(), 1, 1, 
TimeUnit.SECONDS);
+  }
+
+  public boolean canConsumeBandwidth()
+  {
+if (!isBandwidthRestricted()) {
+  return true;
+}
+synchronized (lock) {
+  if (currentBandwidthConsumption >= 0) {
+return true;
+  }
+}
+return false;
+  }
+
+  public void consumeBandwidth(long sentTupleSize)
+  {
+if (isBandwidthRestricted()) {
+  synchronized (lock) {
+currentBandwidthConsumption -= sentTupleSize;
+  }
+}
+  }
+
+  public boolean isBandwidthRestricted()
+  {
+if (bandwidthLimit == Long.MAX_VALUE) {
--- End diff --

`return bandwidthLimit != Long.MAX_VALUE`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #279: APEXMALHAR-2045: Adding bandwidth control fea...

2016-06-22 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/279#discussion_r68150902
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthManager.java ---
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.bandwidth;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+
+/**
+ * BandwidthManager keeps track of bandwidth consumption and provides 
limit on maximum bandwidth that can be consumed at
+ * any moment. This accumulates bandwidth upto certain limits so that 
accumulated bandwidth can be used over a period of
+ * time.
+ */
+public class BandwidthManager implements Component
+{
+  private static final Logger LOG = 
LoggerFactory.getLogger(BandwidthManager.class);
+  /**
+   * Maximum bandwidth that can be consumed in bytes/sec
+   */
+  private long bandwidthLimit = Long.MAX_VALUE;
+  private transient long currentBandwidthConsumption;
+  private final transient ScheduledExecutorService scheduler;
+  private final transient Object lock = new Object();
+
+  public BandwidthManager()
+  {
+scheduler = Executors.newScheduledThreadPool(1);
+  }
+
+  BandwidthManager(ScheduledExecutorService scheduler)
+  {
+this.scheduler = scheduler;
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+scheduler.scheduleAtFixedRate(new BandwidthAccumulator(), 1, 1, 
TimeUnit.SECONDS);
+  }
+
+  public boolean canConsumeBandwidth()
+  {
+if (!isBandwidthRestricted()) {
+  return true;
+}
+synchronized (lock) {
+  if (currentBandwidthConsumption >= 0) {
--- End diff --

`return currentBandwidthConsumption >= 0`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2045) Add bandwidth control feature to Apex

2016-06-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345307#comment-15345307
 ] 

ASF GitHub Bot commented on APEXMALHAR-2045:


Github user vrozov commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/279#discussion_r68148617
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthManager.java ---
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.bandwidth;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+
+/**
+ * BandwidthManager keeps track of bandwidth consumption and provides 
limit on maximum bandwidth that can be consumed at
+ * any moment. This accumulates bandwidth upto certain limits so that 
accumulated bandwidth can be used over a period of
+ * time.
+ */
+public class BandwidthManager implements Component
+{
+  private static final Logger LOG = 
LoggerFactory.getLogger(BandwidthManager.class);
+  /**
+   * Maximum bandwidth that can be consumed in bytes/sec
+   */
+  private long bandwidthLimit = Long.MAX_VALUE;
+  private transient long currentBandwidthConsumption;
+  private final transient ScheduledExecutorService scheduler;
+  private final transient Object lock = new Object();
+
+  public BandwidthManager()
+  {
+scheduler = Executors.newScheduledThreadPool(1);
+  }
+
+  BandwidthManager(ScheduledExecutorService scheduler)
+  {
+this.scheduler = scheduler;
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+scheduler.scheduleAtFixedRate(new BandwidthAccumulator(), 1, 1, 
TimeUnit.SECONDS);
+  }
+
+  public boolean canConsumeBandwidth()
+  {
+if (!isBandwidthRestricted()) {
+  return true;
+}
+synchronized (lock) {
+  if (currentBandwidthConsumption >= 0) {
+return true;
+  }
+}
+return false;
+  }
+
+  public void consumeBandwidth(long sentTupleSize)
+  {
+if (isBandwidthRestricted()) {
+  synchronized (lock) {
+currentBandwidthConsumption -= sentTupleSize;
+  }
+}
+  }
+
+  public boolean isBandwidthRestricted()
+  {
+if (bandwidthLimit == Long.MAX_VALUE) {
+  return false;
+}
+return true;
+  }
+
+  /**
+   * get maximum bandwidth that can be consumed in bytes/sec
+   *
+   * @return
+   */
+  public long getBandwidth()
+  {
+return bandwidthLimit;
+  }
+
+  /**
+   * Set maximum bandwidth that can be consumed in bytes/sec
+   *
+   * @param bandwidth
+   */
+  public void setBandwidth(long bandwidth)
+  {
+this.bandwidthLimit = bandwidth;
+LOG.info("Bandwidth limit is set to: " + bandwidth + " bytes/sec");
--- End diff --

Is INFO level necessary or it can be changed to DEBUG?


> Add bandwidth control feature to Apex
> -
>
> Key: APEXMALHAR-2045
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2045
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
>
>  bandwidth restrictions on input operator for number of bytes to be consumed 
> per second.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2045) Add bandwidth control feature to Apex

2016-06-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345304#comment-15345304
 ] 

ASF GitHub Bot commented on APEXMALHAR-2045:


Github user vrozov commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/279#discussion_r68148125
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthPartitioner.java 
---
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.bandwidth;
+
+import java.util.Collection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+
+public class BandwidthPartitioner 
extends StatelessPartitioner
+{
+  private static final long serialVersionUID = -7502505996637650237L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(BandwidthPartitioner.class);
+
+  /**
+   * This creates a partitioner which creates only one partition.
+   */
+  public BandwidthPartitioner()
+  {
+  }
+
+  /**
+   * This constructor is used to create the partitioner from a property.
+   * 
+   * @param value A string which is an integer of the number of partitions 
to create
+   */
+  public BandwidthPartitioner(String value)
+  {
+super(value);
+  }
+
+  /**
+   * This creates a partitioner which creates partitonCount partitions.
+   * 
+   * @param partitionCount The number of partitions to create.
+   */
+  public BandwidthPartitioner(int partitionCount)
+  {
+super(partitionCount);
+  }
+
+  @Override
+  public Collection 
definePartitions(Collection partitions, PartitioningContext 
context)
+  {
+long currentBandwidth = 
partitions.iterator().next().getPartitionedInstance().getBandwidthManager().getBandwidth()
+* partitions.size();
+Collection newpartitions = 
super.definePartitions(partitions, context);
+return updateBandwidth(newpartitions, currentBandwidth);
+  }
+
+  public Collection updateBandwidth(Collection 
newpartitions, long currentBandwidth)
+  {
+LOG.info("Updating bandwidth of partitions.");
--- End diff --

Is this logging necessary? It does not seem to add much value.


> Add bandwidth control feature to Apex
> -
>
> Key: APEXMALHAR-2045
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2045
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
>
>  bandwidth restrictions on input operator for number of bytes to be consumed 
> per second.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #279: APEXMALHAR-2045: Adding bandwidth control fea...

2016-06-22 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/279#discussion_r68148125
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthPartitioner.java 
---
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.bandwidth;
+
+import java.util.Collection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+
+public class BandwidthPartitioner 
extends StatelessPartitioner
+{
+  private static final long serialVersionUID = -7502505996637650237L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(BandwidthPartitioner.class);
+
+  /**
+   * This creates a partitioner which creates only one partition.
+   */
+  public BandwidthPartitioner()
+  {
+  }
+
+  /**
+   * This constructor is used to create the partitioner from a property.
+   * 
+   * @param value A string which is an integer of the number of partitions 
to create
+   */
+  public BandwidthPartitioner(String value)
+  {
+super(value);
+  }
+
+  /**
+   * This creates a partitioner which creates partitonCount partitions.
+   * 
+   * @param partitionCount The number of partitions to create.
+   */
+  public BandwidthPartitioner(int partitionCount)
+  {
+super(partitionCount);
+  }
+
+  @Override
+  public Collection 
definePartitions(Collection partitions, PartitioningContext 
context)
+  {
+long currentBandwidth = 
partitions.iterator().next().getPartitionedInstance().getBandwidthManager().getBandwidth()
+* partitions.size();
+Collection newpartitions = 
super.definePartitions(partitions, context);
+return updateBandwidth(newpartitions, currentBandwidth);
+  }
+
+  public Collection updateBandwidth(Collection 
newpartitions, long currentBandwidth)
+  {
+LOG.info("Updating bandwidth of partitions.");
--- End diff --

Is this logging necessary? It does not seem to add much value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2045) Add bandwidth control feature to Apex

2016-06-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345292#comment-15345292
 ] 

ASF GitHub Bot commented on APEXMALHAR-2045:


Github user vrozov commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/279#discussion_r68147701
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthManager.java ---
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.bandwidth;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+
+/**
+ * BandwidthManager keeps track of bandwidth consumption and provides 
limit on maximum bandwidth that can be consumed at
+ * any moment. This accumulates bandwidth upto certain limits so that 
accumulated bandwidth can be used over a period of
+ * time.
+ */
+public class BandwidthManager implements Component
+{
+  private static final Logger LOG = 
LoggerFactory.getLogger(BandwidthManager.class);
+  /**
+   * Maximum bandwidth that can be consumed in bytes/sec
+   */
+  private long bandwidthLimit = Long.MAX_VALUE;
+  private transient long currentBandwidthConsumption;
+  private final transient ScheduledExecutorService scheduler;
+  private final transient Object lock = new Object();
+
+  public BandwidthManager()
+  {
+scheduler = Executors.newScheduledThreadPool(1);
+  }
+
+  BandwidthManager(ScheduledExecutorService scheduler)
+  {
+this.scheduler = scheduler;
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+scheduler.scheduleAtFixedRate(new BandwidthAccumulator(), 1, 1, 
TimeUnit.SECONDS);
+  }
+
+  public boolean canConsumeBandwidth()
+  {
+if (!isBandwidthRestricted()) {
+  return true;
+}
+synchronized (lock) {
+  if (currentBandwidthConsumption >= 0) {
+return true;
+  }
+}
+return false;
+  }
+
+  public void consumeBandwidth(long sentTupleSize)
+  {
+if (isBandwidthRestricted()) {
+  synchronized (lock) {
+currentBandwidthConsumption -= sentTupleSize;
+  }
+}
+  }
+
+  public boolean isBandwidthRestricted()
+  {
+if (bandwidthLimit == Long.MAX_VALUE) {
+  return false;
+}
+return true;
+  }
+
+  /**
+   * get maximum bandwidth that can be consumed in bytes/sec
+   *
+   * @return
+   */
+  public long getBandwidth()
+  {
+return bandwidthLimit;
+  }
+
+  /**
+   * Set maximum bandwidth that can be consumed in bytes/sec
+   *
+   * @param bandwidth
+   */
+  public void setBandwidth(long bandwidth)
+  {
+this.bandwidthLimit = bandwidth;
+LOG.info("Bandwidth limit is set to: " + bandwidth + " bytes/sec");
+  }
+
+  @Override
+  public void teardown()
+  {
+scheduler.shutdownNow();
+  }
+
+  class BandwidthAccumulator implements Runnable
--- End diff --

Is there a reason to make this class visible outside of BandwidthManager? 
Please make it either anonymous (preferable) or private.


> Add bandwidth control feature to Apex
> -
>
> Key: APEXMALHAR-2045
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2045
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
>
>  bandwidth restrictions on input operator for number of bytes to be consumed 

[GitHub] apex-malhar pull request #279: APEXMALHAR-2045: Adding bandwidth control fea...

2016-06-22 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/279#discussion_r68147701
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthManager.java ---
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.bandwidth;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+
+/**
+ * BandwidthManager keeps track of bandwidth consumption and provides 
limit on maximum bandwidth that can be consumed at
+ * any moment. This accumulates bandwidth upto certain limits so that 
accumulated bandwidth can be used over a period of
+ * time.
+ */
+public class BandwidthManager implements Component
+{
+  private static final Logger LOG = 
LoggerFactory.getLogger(BandwidthManager.class);
+  /**
+   * Maximum bandwidth that can be consumed in bytes/sec
+   */
+  private long bandwidthLimit = Long.MAX_VALUE;
+  private transient long currentBandwidthConsumption;
+  private final transient ScheduledExecutorService scheduler;
+  private final transient Object lock = new Object();
+
+  public BandwidthManager()
+  {
+scheduler = Executors.newScheduledThreadPool(1);
+  }
+
+  BandwidthManager(ScheduledExecutorService scheduler)
+  {
+this.scheduler = scheduler;
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+scheduler.scheduleAtFixedRate(new BandwidthAccumulator(), 1, 1, 
TimeUnit.SECONDS);
+  }
+
+  public boolean canConsumeBandwidth()
+  {
+if (!isBandwidthRestricted()) {
+  return true;
+}
+synchronized (lock) {
+  if (currentBandwidthConsumption >= 0) {
+return true;
+  }
+}
+return false;
+  }
+
+  public void consumeBandwidth(long sentTupleSize)
+  {
+if (isBandwidthRestricted()) {
+  synchronized (lock) {
+currentBandwidthConsumption -= sentTupleSize;
+  }
+}
+  }
+
+  public boolean isBandwidthRestricted()
+  {
+if (bandwidthLimit == Long.MAX_VALUE) {
+  return false;
+}
+return true;
+  }
+
+  /**
+   * get maximum bandwidth that can be consumed in bytes/sec
+   *
+   * @return
+   */
+  public long getBandwidth()
+  {
+return bandwidthLimit;
+  }
+
+  /**
+   * Set maximum bandwidth that can be consumed in bytes/sec
+   *
+   * @param bandwidth
+   */
+  public void setBandwidth(long bandwidth)
+  {
+this.bandwidthLimit = bandwidth;
+LOG.info("Bandwidth limit is set to: " + bandwidth + " bytes/sec");
+  }
+
+  @Override
+  public void teardown()
+  {
+scheduler.shutdownNow();
+  }
+
+  class BandwidthAccumulator implements Runnable
--- End diff --

Is there a reason to make this class visible outside of BandwidthManager? 
Please make it either anonymous (preferable) or private.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #321: APEXMALHAR-2120 #resolve #comment solve probl...

2016-06-22 Thread brightchen
GitHub user brightchen reopened a pull request:

https://github.com/apache/apex-malhar/pull/321

APEXMALHAR-2120 #resolve #comment solve problems of KafkaInputOperato…

…rTest and AbstractKafkaInputOperator

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/brightchen/apex-malhar APEXMALHAR-2120

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/321.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #321


commit 252a70127dbcec3b462ccb714f13a5c7684e4293
Author: brightchen 
Date:   2016-06-14T23:30:17Z

APEXMALHAR-2120 #resolve #comment solve problems of KafkaInputOperatorTest 
and AbstractKafkaInputOperator




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (APEXMALHAR-2124) DTFileTest.seekDTFile failed in Travis

2016-06-22 Thread bright chen (JIRA)
bright chen created APEXMALHAR-2124:
---

 Summary: DTFileTest.seekDTFile failed in Travis
 Key: APEXMALHAR-2124
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2124
 Project: Apache Apex Malhar
  Issue Type: Bug
Reporter: bright chen


This fail doesn't happen very often but I got it 3 times.

Here are some information:

Running org.apache.hadoop.io.file.tfile.TestDTFile
2016-06-21 21:33:46,831 [main] INFO  compress.CodecPool getDecompressor - Got 
brand-new decompressor [.deflate]
2016-06-21 21:33:46,832 [main] INFO  compress.CodecPool getDecompressor - Got 
brand-new decompressor [.deflate]
2016-06-21 21:33:46,832 [main] INFO  compress.CodecPool getDecompressor - Got 
brand-new decompressor [.deflate]
2016-06-21 21:33:46,852 [main] INFO  compress.CodecPool getDecompressor - Got 
brand-new decompressor [.deflate]
2016-06-21 21:33:46,936 [main] INFO  compress.CodecPool getDecompressor - Got 
brand-new decompressor [.deflate]
2016-06-21 21:33:46,937 [main] INFO  compress.CodecPool getDecompressor - Got 
brand-new decompressor [.deflate]
2016-06-21 21:33:46,937 [main] INFO  compress.CodecPool getDecompressor - Got 
brand-new decompressor [.deflate]
2016-06-21 21:33:46,957 [main] INFO  compress.CodecPool getDecompressor - Got 
brand-new decompressor [.deflate]
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.224 sec
Running org.apache.hadoop.io.file.tfile.TestTFileLzoCodecsStreams
Skipped
Skipped
..
Skipped

Failed tests: 
  DTFileTest.seekDTFile:189 Cache contains one more blocks  expected:<5> but 
was:<6>

Tests run: 910, Failures: 1, Errors: 0, Skipped: 0

this is the link of the log:
https://s3.amazonaws.com/archive.travis-ci.org/jobs/139311024/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2045) Add bandwidth control feature to Apex

2016-06-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345268#comment-15345268
 ] 

ASF GitHub Bot commented on APEXMALHAR-2045:


Github user vrozov commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/279#discussion_r68144992
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthLimitingOperator.java
 ---
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.bandwidth;
+
+import com.datatorrent.api.Operator;
+
+/**
+ * Operator which limits bandwidth consumption. It should have instance of 
BandwidthManager.
+ */
+public interface BandwidthLimitingOperator extends Operator
+{
+  public BandwidthManager getBandwidthManager();
--- End diff --

`public` is redundant.


> Add bandwidth control feature to Apex
> -
>
> Key: APEXMALHAR-2045
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2045
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
>
>  bandwidth restrictions on input operator for number of bytes to be consumed 
> per second.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #279: APEXMALHAR-2045: Adding bandwidth control fea...

2016-06-22 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/279#discussion_r68144992
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthLimitingOperator.java
 ---
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.bandwidth;
+
+import com.datatorrent.api.Operator;
+
+/**
+ * Operator which limits bandwidth consumption. It should have instance of 
BandwidthManager.
+ */
+public interface BandwidthLimitingOperator extends Operator
+{
+  public BandwidthManager getBandwidthManager();
--- End diff --

`public` is redundant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXCORE-473) Default Unifier without serialization

2016-06-22 Thread Sandesh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345223#comment-15345223
 ] 

Sandesh commented on APEXCORE-473:
--

Yes, that's what limits the benefits of this approach. 

> Default Unifier without serialization
> -
>
> Key: APEXCORE-473
> URL: https://issues.apache.org/jira/browse/APEXCORE-473
> Project: Apache Apex Core
>  Issue Type: Improvement
>Reporter: Sandesh
>
> When a DefaultUnifier is deployed in a separate container, there is no need 
> to do Serialization/Deserialization of the tuples. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXCORE-473) Default Unifier without serialization

2016-06-22 Thread Thomas Weise (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345190#comment-15345190
 ] 

Thomas Weise commented on APEXCORE-473:
---

BTW when you have a shuffle (MxN) then the unifier will be with the downstream 
operator and the optimization would not help.

> Default Unifier without serialization
> -
>
> Key: APEXCORE-473
> URL: https://issues.apache.org/jira/browse/APEXCORE-473
> Project: Apache Apex Core
>  Issue Type: Improvement
>Reporter: Sandesh
>
> When a DefaultUnifier is deployed in a separate container, there is no need 
> to do Serialization/Deserialization of the tuples. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXCORE-473) Default Unifier without serialization

2016-06-22 Thread Thomas Weise (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345174#comment-15345174
 ] 

Thomas Weise commented on APEXCORE-473:
---

Since the implementation of that unifier is part of the engine and we know it 
is pass through, we could bypass the operator call. Nice optimization proposal!

> Default Unifier without serialization
> -
>
> Key: APEXCORE-473
> URL: https://issues.apache.org/jira/browse/APEXCORE-473
> Project: Apache Apex Core
>  Issue Type: Improvement
>Reporter: Sandesh
>
> When a DefaultUnifier is deployed in a separate container, there is no need 
> to do Serialization/Deserialization of the tuples. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

2016-06-22 Thread Chinmay Kolhatkar
I think its a good idea to have a scheduling operator when you need to
start a part of the DAG when some trigger happens (for eg. FileSplitter
identifying new files in FS) and otherwise bring it down to save resources.

On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas <
timothytiborfar...@gmail.com> wrote:

> I am in agreement with Chandni. Scheduling a batch job is an API completely
> independent of a DAG or an operator. It could be used by a commandline tool
> running on your laptop, a script, or it could happen to be used by an
> Operator running in a DAG and a StatsListener.
>
> On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise 
> wrote:
>
> > Scheduling can be independent, although we have use cases where the
> > scheduling depends on completion of processing (multi-staged batch jobs
> > where unused resources need to be freed).
> >
> > Both can be accomplished with a stats listener.
> >
> > There can be a "scheduling operator" that brings up and removes DAG
> > fragments as needed.
> >
> > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh 
> > wrote:
> >
> > > Hi,
> > > IMO scheduling a job can be independent of any operator while
> > > StatsListeners are not.  I understand that in a lot of cases
> input/output
> > > operators will decide when the job ends but there can be cases when
> > > scheduling can be independent of it.
> > >
> > > Thanks,
> > > Chandni
> > > On Jun 21, 2016 12:12 PM, "Thomas Weise" 
> wrote:
> > >
> > > > This looks like something that coordination wise belongs into the
> > master
> > > > and can be done with a shared stats listener.
> > > >
> > > > The operator request/response protocol could be used the relay the
> data
> > > for
> > > > the scheduling decisions.
> > > >
> > > > Thomas
> > > >
> > > >
> > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
> > > > chandni.si...@capitalone.com> wrote:
> > > >
> > > > > Hi Tushar,
> > > > >
> > > > > I have some questions about the use case 2: Batch Support
> > > > > I don¹t understand the advantages of providing batch support by
> > having
> > > an
> > > > > operator as a scheduler.
> > > > >
> > > > > An approach that seemed a little more straightforward to me was to
> > > expose
> > > > > an API for scheduler. If there is a scheduler set then the master
> > uses
> > > > and
> > > > > schedules operators. By default there isn¹t any scheduler and the
> job
> > > is
> > > > > run as it is now.
> > > > >
> > > > > Maybe this is too simplistic but can you please let me know why
> > having
> > > an
> > > > > operator as a scheduler is a better way?
> > > > >
> > > > > Thanks,
> > > > > Chandni
> > > > >
> > > > >
> > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" 
> > wrote:
> > > > >
> > > > > >Hi All,
> > > > > >
> > > > > >We have seen few use cases in field which require Apex application
> > > > > >scheduling based on some condition. This has also came up as part
> of
> > > > > >Batch Support in Apex previously
> > > > > >(
> > > > >
> > > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
> > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%
> 40mail.gmail.com
> > > %3E)
> > > > > >. I am proposing following functionality in Apex to help
> scheduling
> > > > > >and better resource utilization for batch jobs. Please provide
> your
> > > > > >comments.
> > > > > >
> > > > > >Usecase 1 - Dynamic Dag modification.
> > > > > >
> > > > > >Each operator in DAG consumes yarn resources, sometimes it is
> > > > > >desirable to return the resources to yarn when no data is
> available
> > > > > >for processing, and deploy whole DAG once data starts to appear.
> For
> > > > > >this to happen automatically, we will need some data monitoring
> > > > > >operators running in the DAG to trigger restart and shutdown of
> the
> > > > > >operators in the DAG.
> > > > > >
> > > > > >Apex already have such api to dynamically change the running dag
> > > > > >through cli. We could provide similar API available to operators
> > which
> > > > > >will trigger dag modification at runtime. This information can be
> > > > > >passed to master using heartbeat RPC and master will make
> > > > > >required changed to the DAG. let me know what do you think about
> > it..
> > > > > >something like below.
> > > > > >Context.beginDagChange();
> > > > > >context.addOperator("o1") <== launch operator from previous
> > > > check-pointed
> > > > > >state.
> > > > > >context.addOperator("o2", new Operator2()) <== create new operator
> > > > > >context.addStream("s1", "reader.output", "o1.input");
> > > > > >context.shutdown("o3"); <== delete this and downstream operators
> > from
> > > > the
> > > > > >DAG.
> > > > > >context.apply();  <== dag changes will be send to master, and
> master
> > > > > >will apply these changes.
> > > > > >
> > > > > >Similarly API for other functionalities such as locality settings
> > > > > >needs to be 

[jira] [Updated] (APEXMALHAR-2120) Fix bugs on KafkaInputOperatorTest and AbstractKafkaInputOperator

2016-06-22 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2120:

Description: 
problems in Unit Test class: KafkaInputOperatorTest
- 'k' not initialized for each test case
- The assert was not correct
- The test case assume the END_TUPLE will be received at the end of normal 
tuples, but in fact the tuples could be out of order where support multiple 
cluster or partition. Here is one example: 2016-06-22 08:54:12,827 [main] INFO  
kafka.KafkaInputOperatorTest testInputOperator - Number of received/expected 
tuples: 22/22, testName: testtopic16, tuples: 
[c1_2, c1_3, c1_6, c1_9, c1_10, c1_13, c1_14, c1_19, c1_20, END_TUPLE, 
END_TUPLE, c1_1, c1_4, c1_5, c1_7, c1_8, c1_11, c1_12, c1_15, c1_16, c1_17, 
c1_18]

- The operator AbstractKafkaInputOperator implemented as "at least once", but 
the test case assume "exactly once"

- RuntimeException: Couldn't replay the offset: see following log.

- RuntimeException: OneToOnePartitioner.assign(OneToOnePartitioner.java:52) or 
OneToManyPartitioner.assign(OneToManyPartitioner.java:57), that should due to 
be caused by NullPointerException. See following log


problem of AbstractKafkaInputOperator:

- RuntimeException: Couldn't replay the offset:
For test case KafkaInputOperatorTest.testIdempotentInputOperatorWithFailure() 
with senario
{true, true, "one_to_many"}
("multi-cluster: true, multi-partition: true, partition: "one_to_many") throws 
following exception and the Collector Module didn't collect any data.
2016-06-15 10:43:56,358 [1/Kafka inputtesttopic0:KafkaSinglePortInputOperator] 
INFO stram.StramLocalCluster log - container-6 msg: Stopped running due to an 
exception. java.lang.RuntimeException: Couldn't replay the offset
at 
org.apache.apex.malhar.kafka.KafkaConsumerWrapper.emitImmediately(KafkaConsumerWrapper.java:146)
at 
org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.replay(AbstractKafkaInputOperator.java:261)
at 
org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.beginWindow(AbstractKafkaInputOperator.java:250)
at com.datatorrent.stram.engine.InputNode.run(InputNode.java:122)
at 
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1407)
Caused by: org.apache.kafka.clients.consumer.NoOffsetForPartitionException: 
Undefined offset with no reset policy for partition: testtopic0-1
at 
org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)

- ConcurrentModificationException
2016-06-16 10:14:32,400 [1/Kafka inputtesttopic4:KafkaSinglePortInputOperator] 
ERROR engine.StreamingContainer run - Shutdown of operator 
OperatorDeployInfo[id=1,name=Kafka 
inputtesttopic4,type=INPUT,checkpoint={, 0, 
0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=outputPort,streamId=Kafka
 messagetesttopic4,bufferServer=MacBook-Pro-2.local]]] failed due to an 
exception.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1324)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1255)
at 
org.apache.apex.malhar.kafka.KafkaConsumerWrapper.stop(KafkaConsumerWrapper.java:340)
at 
org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.deactivate(AbstractKafkaInputOperator.java:184)
at com.datatorrent.stram.engine.Node.deactivate(Node.java:646)
at 
com.datatorrent.stram.engine.StreamingContainer.teardownNode(StreamingContainer.java:1347)
at 
com.datatorrent.stram.engine.StreamingContainer.access$500(StreamingContainer.java:130)
at 
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1438)
2016-06-16 10:14:32,400 [2/Kafka inputtesttopic4:KafkaSinglePortInputOperator] 
ERROR engine.StreamingContainer run - Shutdown of operator 
OperatorDeployInfo[id=2,name=Kafka 
inputtesttopic4,type=INPUT,checkpoint={, 0, 
0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=outputPort,streamId=Kafka
 messagetesttopic4,bufferServer=MacBook-Pro-2.local]]] failed due to an