[GitHub] apex-core pull request #534: YarnClient to get containerInfo

2017-05-31 Thread devtagare
GitHub user devtagare opened a pull request:

https://github.com/apache/apex-core/pull/534

YarnClient to get containerInfo

@PramodSSImmaneni could you please review and merge ?

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/devtagare/incubator-apex-core APEXCORE-736

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/534.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #534


commit f0fa410e5156fa9ea25c6447495a0e3f40cc6c21
Author: devtagare <devtag...@gmail.com>
Date:   2017-05-31T21:09:00Z

YarnClient to get containerInfo




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #526: APEXCORE-712 custom keystore at launch

2017-05-12 Thread devtagare
GitHub user devtagare opened a pull request:

https://github.com/apache/apex-core/pull/526

APEXCORE-712 custom keystore at launch



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/devtagare/incubator-apex-core APEXCORE-712

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/526.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #526


commit 5b9c759dda9252f2066b6b6c69b87a21a1cec067
Author: devtagare <devtag...@gmail.com>
Date:   2017-05-12T22:04:22Z

APEXCORE-712 custom keystore at launch




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #495: APEXCORE-682 Fix for getting StramWebServicesIn...

2017-03-24 Thread devtagare
GitHub user devtagare opened a pull request:

https://github.com/apache/apex-core/pull/495

APEXCORE-682 Fix for getting StramWebServicesInfo when app is launche…

…d from user given path

@PramodSSImmaneni - could you please review and merge

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/devtagare/incubator-apex-core APEXCORE-682

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/495.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #495


commit f7338e23bb6c41378cdeffaeee50b6a2a1568c4c
Author: devtagare <devtag...@gmail.com>
Date:   2017-03-24T23:59:43Z

APEXCORE-682 Fix for getting StramWebServicesInfo when app is launched from 
user given path




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #467: APEXCORE-636 - user level kerberos support

2017-02-07 Thread devtagare
GitHub user devtagare opened a pull request:

https://github.com/apache/apex-core/pull/467

APEXCORE-636 - user level kerberos support

@PramodSSImmaneni  could you please review

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/devtagare/incubator-apex-core APEXCORE-636

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/467.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #467


commit dfe1a23224092c63dbff3b3652199310cb709f7b
Author: devtagare <devtag...@gmail.com>
Date:   2017-02-08T02:42:17Z

APEXCORE-636 - user level kerberos support




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #348: APEXMALHAR-2157 Json formatter improvements

2016-07-20 Thread devtagare
Github user devtagare commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/348#discussion_r71555740
  
--- Diff: 
library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java ---
@@ -35,6 +35,8 @@
 
 import org.apache.commons.io.FileUtils;
 
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.Lists;
 
--- End diff --

Please update the test cases here to include additional assertions based on 
the autometric counters added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #340: APEXMALHAR-2140 ActiveFieldInfo moved to Abst...

2016-07-12 Thread devtagare
GitHub user devtagare opened a pull request:

https://github.com/apache/apex-malhar/pull/340

APEXMALHAR-2140 ActiveFieldInfo moved to AbstractJdbcPOJOOutputOperator


@amberarrow could you please review the changes.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/devtagare/incubator-apex-malhar 
APEXMALHAR-2140

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/340.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #340


commit 883153fad0321a8ef9efc8601a6b8703c0991d29
Author: devtagare <devtag...@gmail.com>
Date:   2016-07-12T20:40:39Z

APEXMALHAR-2140 ActiveFieldInfo moved to AbstractJdbcPOJOOutputOperator




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #282: APEXMALHAR-2066 JdbcPolling,idempotent,partit...

2016-07-12 Thread devtagare
Github user devtagare commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/282#discussion_r70488149
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -0,0 +1,652 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.validation.constraints.Min;
+
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.commons.lang3.tuple.MutablePair;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.Operator.IdleTimeHandler;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.lib.db.AbstractStoreInputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Abstract operator for for consuming data using JDBC interface
+ * User needs User needs to provide
+ * tableName,dbConnection,setEmitColumnList,look-up key 
+ * Optionally batchSize,pollInterval,Look-up key and a where clause can be 
given
+ * 
+ * This operator uses static partitioning to arrive at range queries for 
exactly
+ * once reads
+ * Assumption is that there is an ordered column using which range queries 
can
+ * be formed
+ * If an emitColumnList is provided, please ensure that the keyColumn is 
the
+ * first column in the list
+ * Range queries are formed using the {@link JdbcMetaDataUtility}} Output -
+ * comma separated list of the emit columns eg columnA,columnB,columnC
+ * 
+ * @displayName Jdbc Polling Input Operator
+ * @category Input
+ * @tags database, sql, jdbc, partitionable,exactlyOnce
+ */
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator<T, JdbcStore>
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #282: APEXMALHAR-2066 JdbcPolling,idempotent,partit...

2016-07-12 Thread devtagare
Github user devtagare commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/282#discussion_r70483447
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java ---
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * A utility class used to retrieve the metadata for a given unique key of 
a SQL
+ * table. This class would emit range queries based on a primary index 
given
+ * 
+ * @Input - dbName,tableName, primaryKey
+ * @Output - map<operatorId,prepared statement>
+ *
+ */
+public class JdbcMetaDataUtility
+{
+  private static String DB_DRIVER = "com.mysql.jdbc.Driver";
+  private static String DB_CONNECTION = "";
+  private static String DB_USER = "";
+  private static String DB_PASSWORD = "";
+  private static String TABLE_NAME = "";
+  private static String KEY_COLUMN = "";
+  private static String WHERE_CLAUSE = null;
+  private static String COLUMN_LIST = null;
+
+  private static Logger LOG = 
LoggerFactory.getLogger(JdbcMetaDataUtility.class);
+
+  public JdbcMetaDataUtility()
+  {
+
+  }
+
+  public JdbcMetaDataUtility(String dbConnection, String tableName, String 
key, String userName, String password)
+  {
+DB_CONNECTION = dbConnection;
+DB_USER = userName;
+DB_PASSWORD = password;
+TABLE_NAME = tableName;
+KEY_COLUMN = key;
+  }
+
+  private static Connection getDBConnection()
+  {
+
+Connection dbConnection = null;
+
+try {
+  Class.forName(DB_DRIVER);
+} catch (ClassNotFoundException e) {
+  LOG.error("Driver not found", e);
+}
+
+try {
+  dbConnection = DriverManager.getConnection(DB_CONNECTION, DB_USER, 
DB_PASSWORD);
+  return dbConnection;
+} catch (SQLException e) {
+  LOG.error("Exception in getting connection handle", e);
+}
+
+return dbConnection;
+
+  }
+
+  private static String generateQueryString()
+  {
+StringBuilder sb = new StringBuilder();
+sb.append("SELECT COUNT(*) as RowCount from " + TABLE_NAME);
+
+if (WHERE_CLAUSE != null) {
+  sb.append(" WHERE " + WHERE_CLAUSE);
+}
+
+return sb.toString();
+  }
+
+  /**
+   * Finds the total number of rows in the table
+   */
+  private static long getRecordRange(String query) throws SQLException
+  {
+long rowCount = 0;
+Connection dbConnection = null;
+PreparedStatement preparedStatement = null;
+
+try {
+  dbConnection = getDBConnection();
+  preparedStatement = dbConnection.prepareStatement(query);
+
+  ResultSet rs = preparedStatement.executeQuery();
+
+  while (rs.next()) {
+rowCount = Long.parseLong(rs.getString("RowCount"));
+LOG.info("# Rows - " + rowCount);
+  }
+
+} catch (SQLException e) {
+  LOG.error("Exception in retreiving result set", e);
+} finally {
+  if (preparedStatement != null) {
+preparedStatement.close();
+  }
+  if (dbConnection != null) {
+dbConnection.close();
+  }
+}
+return rowCount;
+  }
+
+  /**
+   * Returns a pair of <upper,lower> bounds for each partition of the
+   * {@link JdbcPollInputOperator}}
+   */

[GitHub] apex-malhar pull request #282: APEXMALHAR-2066 JdbcPolling,idempotent,partit...

2016-07-12 Thread devtagare
Github user devtagare commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/282#discussion_r70483210
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java ---
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * A utility class used to retrieve the metadata for a given unique key of 
a SQL
+ * table. This class would emit range queries based on a primary index 
given
+ * 
+ * @Input - dbName,tableName, primaryKey
+ * @Output - map<operatorId,prepared statement>
+ *
+ */
+public class JdbcMetaDataUtility
+{
+  private static String DB_DRIVER = "com.mysql.jdbc.Driver";
+  private static String DB_CONNECTION = "";
+  private static String DB_USER = "";
+  private static String DB_PASSWORD = "";
+  private static String TABLE_NAME = "";
+  private static String KEY_COLUMN = "";
+  private static String WHERE_CLAUSE = null;
+  private static String COLUMN_LIST = null;
+
+  private static Logger LOG = 
LoggerFactory.getLogger(JdbcMetaDataUtility.class);
+
+  public JdbcMetaDataUtility()
+  {
+
+  }
+
+  public JdbcMetaDataUtility(String dbConnection, String tableName, String 
key, String userName, String password)
+  {
+DB_CONNECTION = dbConnection;
+DB_USER = userName;
+DB_PASSWORD = password;
+TABLE_NAME = tableName;
+KEY_COLUMN = key;
+  }
+
+  private static Connection getDBConnection()
+  {
+
+Connection dbConnection = null;
+
+try {
+  Class.forName(DB_DRIVER);
+} catch (ClassNotFoundException e) {
+  LOG.error("Driver not found", e);
--- End diff --

Done



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #282: APEXMALHAR-2066 JdbcPolling,idempotent,partit...

2016-07-12 Thread devtagare
Github user devtagare commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/282#discussion_r70483252
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java ---
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * A utility class used to retrieve the metadata for a given unique key of 
a SQL
+ * table. This class would emit range queries based on a primary index 
given
+ * 
+ * @Input - dbName,tableName, primaryKey
+ * @Output - map<operatorId,prepared statement>
+ *
+ */
+public class JdbcMetaDataUtility
+{
+  private static String DB_DRIVER = "com.mysql.jdbc.Driver";
+  private static String DB_CONNECTION = "";
+  private static String DB_USER = "";
+  private static String DB_PASSWORD = "";
+  private static String TABLE_NAME = "";
+  private static String KEY_COLUMN = "";
+  private static String WHERE_CLAUSE = null;
+  private static String COLUMN_LIST = null;
+
+  private static Logger LOG = 
LoggerFactory.getLogger(JdbcMetaDataUtility.class);
+
+  public JdbcMetaDataUtility()
+  {
+
+  }
+
+  public JdbcMetaDataUtility(String dbConnection, String tableName, String 
key, String userName, String password)
+  {
+DB_CONNECTION = dbConnection;
+DB_USER = userName;
+DB_PASSWORD = password;
+TABLE_NAME = tableName;
+KEY_COLUMN = key;
+  }
+
+  private static Connection getDBConnection()
+  {
+
+Connection dbConnection = null;
+
+try {
+  Class.forName(DB_DRIVER);
+} catch (ClassNotFoundException e) {
+  LOG.error("Driver not found", e);
+}
+
+try {
+  dbConnection = DriverManager.getConnection(DB_CONNECTION, DB_USER, 
DB_PASSWORD);
+  return dbConnection;
+} catch (SQLException e) {
+  LOG.error("Exception in getting connection handle", e);
--- End diff --

Done



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #298: *For review only* [APEXMALHAR-2086] Kafka out...

2016-06-10 Thread devtagare
Github user devtagare commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/298#discussion_r66696821
  
--- Diff: 
kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractExactlyOnceKafkaOutputOperator.java
 ---
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+
+/**
+ * Kafka output operator with exactly once processing semantics under 
certain conditions.,
+ *
+ *  Requirement for Exactly Once:
+ *   Every message within the Window is unique
+ *
+ *  This operator uses *Key* to distinguish the messages written by 
particular instance of the Output operator.
+ *  Operator users can only use *value* for storing the data.
+ *
+ * @displayName Abstract Exactly Once Kafka Output(0.9.0)
+ * @category Messaging
+ * @tags output operator
+ *
+ * @since 3.5
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public abstract class AbstractExactlyOnceKafkaOutputOperator extends 
AbstractKafkaOutputOperator<String, T>
+implements Operator.CheckpointNotificationListener
+{
+  private transient String key;
+  private transient String appId;
+  private transient Integer operatorId;
+  private transient Long windowId;
+  private transient Map<T, Integer> partialWindowTuples = new HashMap<>();
+  private transient KafkaConsumer consumer;
+
+  private WindowDataManager windowDataManager = new FSWindowDataManager();
+  private final int KAFKA_CONNECT_ATTEMPT = 10;
+  private final String KEY_SEPARATOR = "#";
+  private final String KEY_SERIALIZER = 
"org.apache.kafka.common.serialization.StringDeserializer";
+  private final String VALUE_SERIALIZER = 
"org.apache.kafka.common.serialization.StringDeserializer";
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+super.setup(context);
+
+this.operatorId = context.getId();
+this.windowDataManager.setup(context);
+this.appId = context.getValue(Context.DAGContext.APPLICATION_ID);
+this.key = appId + KEY_SEPARATOR + (new Integer(operatorId));
+this.consumer = KafkaConsumerInit();
+
+setProperty(ACKS_CONFIG, "all");
+  }
+
--- End diff --

can't we have leader only acknowledgement as the default . acks =1 ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #298: *For review only* [APEXMALHAR-2086] Kafka out...

2016-06-10 Thread devtagare
Github user devtagare commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/298#discussion_r66696282
  
--- Diff: 
kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractExactlyOnceKafkaOutputOperator.java
 ---
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+
+/**
+ * Kafka output operator with exactly once processing semantics under 
certain conditions.,
+ *
+ *  Requirement for Exactly Once:
+ *   Every message within the Window is unique
+ *
+ *  This operator uses *Key* to distinguish the messages written by 
particular instance of the Output operator.
+ *  Operator users can only use *value* for storing the data.
+ *
+ * @displayName Abstract Exactly Once Kafka Output(0.9.0)
+ * @category Messaging
+ * @tags output operator
+ *
+ * @since 3.5
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public abstract class AbstractExactlyOnceKafkaOutputOperator extends 
AbstractKafkaOutputOperator<String, T>
+implements Operator.CheckpointNotificationListener
+{
+  private transient String key;
+  private transient String appId;
+  private transient Integer operatorId;
+  private transient Long windowId;
+  private transient Map<T, Integer> partialWindowTuples = new HashMap<>();
+  private transient KafkaConsumer consumer;
+
+  private WindowDataManager windowDataManager = new FSWindowDataManager();
+  private final int KAFKA_CONNECT_ATTEMPT = 10;
+  private final String KEY_SEPARATOR = "#";
+  private final String KEY_SERIALIZER = 
"org.apache.kafka.common.serialization.StringDeserializer";
+  private final String VALUE_SERIALIZER = 
"org.apache.kafka.common.serialization.StringDeserializer";
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+super.setup(context);
+
+this.operatorId = context.getId();
+this.windowDataManager.setup(context);
+this.appId = context.getValue(Context.DAGContext.APPLICATION_ID);
+this.key = appId + KEY_SEPARATOR + (new Integer(operatorId));
+this.consumer = KafkaConsumerInit();
--- End diff --

this.key = (new Integer(operatorId)) + partitionId (kafkaPartition) can be 
a more deterministic key.Since you want to peek into only the partitions to 
which this operator had written.Also APP_ID is a constant and does not add to 
arriving at uniqueness



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not

[GitHub] apex-malhar pull request #298: *For review only* [APEXMALHAR-2086] Kafka out...

2016-06-10 Thread devtagare
Github user devtagare commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/298#discussion_r66696137
  
--- Diff: 
kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractExactlyOnceKafkaOutputOperator.java
 ---
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+
+/**
+ * Kafka output operator with exactly once processing semantics under 
certain conditions.,
+ *
+ *  Requirement for Exactly Once:
+ *   Every message within the Window is unique
+ *
+ *  This operator uses *Key* to distinguish the messages written by 
particular instance of the Output operator.
+ *  Operator users can only use *value* for storing the data.
+ *
+ * @displayName Abstract Exactly Once Kafka Output(0.9.0)
+ * @category Messaging
+ * @tags output operator
+ *
+ * @since 3.5
--- End diff --

Why unique records in a window ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #313: Apexmalhar 2113 jdbc pojo output operator

2016-06-07 Thread devtagare
Github user devtagare closed the pull request at:

https://github.com/apache/apex-malhar/pull/313


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2066 JdbcPolling,idempotent,p...

2016-05-31 Thread devtagare
Github user devtagare commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/282#discussion_r65223757
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -0,0 +1,652 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.validation.constraints.Min;
+
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.commons.lang3.tuple.MutablePair;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.Operator.IdleTimeHandler;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.lib.db.AbstractStoreInputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Abstract operator for for consuming data using JDBC interface
+ * User needs User needs to provide
+ * tableName,dbConnection,setEmitColumnList,look-up key 
+ * Optionally batchSize,pollInterval,Look-up key and a where clause can be 
given
+ * 
+ * This operator uses static partitioning to arrive at range queries for 
exactly
+ * once reads
+ * Assumption is that there is an ordered column using which range queries 
can
+ * be formed
+ * If an emitColumnList is provided, please ensure that the keyColumn is 
the
+ * first column in the list
+ * Range queries are formed using the {@link JdbcMetaDataUtility}} Output -
+ * comma separated list of the emit columns eg columnA,columnB,columnC
+ * 
+ * @displayName Jdbc Polling Input Operator
+ * @category Input
+ * @tags database, sql, jdbc, partitionable,exactlyOnce
+ */
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator<T, JdbcStore>
--- End diff --

This operator supports polling,partitioning & idempotency.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-apex-malhar pull request: Apexmalhar 2033 streaming pars...

2016-05-27 Thread devtagare
Github user devtagare commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/288#discussion_r64865714
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/parser/StreamingJsonParser.java 
---
@@ -0,0 +1,431 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.StringTokenizer;
+
+import org.elasticsearch.common.primitives.Ints;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.parser.Parser;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Operator that parses a JSON string tuple and emits a POJO on the output 
port
+ * and tuples that could not be parsed on error port.Upstream operator 
needs to
+ * ensure that a full JSON record is emitted.
+ * Properties
+ * pojoClass:POJO class 
+ * (optional)fieldMappingStringString of format
+ * fieldNameInJson:fieldNameInPOJO:DataType
+ * Ports 
+ * in:input tuple as a String. Each tuple represents a json 
string
+ * out:tuples that are validated as per the user defined POJO are 
emitted
+ * as POJO on this port
+ * err:tuples that could not be parsed are emitted on this port as
+ * KeyValPair<String,String>
+ * Key being the tuple and Val being the reason
+ * 
+ * @displayName SimpleStreamingJsonParser
+ * @category Parsers
+ * @tags json pojo parser streaming
+ */
+@InterfaceStability.Evolving
+public class StreamingJsonParser extends Parser<byte[], KeyValPair<String, 
String>>
+{
+  private transient JSONParser jsonParser;
+  private transient String fieldMappingString;
+  private transient List fieldInfos;
+  private transient List columnFieldSetters;
+  protected JsonKeyFinder finder;
+  private static final String FIELD_SEPARATOR = ":";
+  private static final String RECORD_SEPARATOR = ",";
+  private transient ArrayList columnFields;
+  private transient Class pojoClass;
+
+  /**
+   * @return POJO class
+   */
+  private Class getPojoClass()
+  {
+return pojoClass;
+  }
+
+  /**
+   * Sets the POJO class
+   */
+  public void setPojoClass(Class pojoClass)
+  {
+this.pojoClass = pojoClass;
+  }
+
+  /**
+   * Returns a string representing mapping from generic record to POJO 
fields
+   */
+  public String getFieldMappingString()
+  {
+return fieldMappingString;
+  }
+
+  /**
+   * Comma separated list mapping a field in JSON schema to POJO field eg :
+   * fieldNameInPOJO:fieldNameInJSON:DataType
+   */
+  public void setFieldMappingString(String pojoFieldsToJsonMapping)
+  {
+this.fieldMappingString = pojoFieldsToJsonMapping;
+  }
+
+  public StreamingJsonParser()
+  {
+
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+jsonParser = new JSONParser();
+finder = new JsonKeyFinder();
+columnFields = new ArrayList();
+columnFieldSette

[GitHub] incubator-apex-malhar pull request: Apexmalhar 2033 streaming pars...

2016-05-25 Thread devtagare
Github user devtagare commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/288#discussion_r64655954
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/parser/JsonKeyFinder.java ---
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.json.simple.parser.ContentHandler;
+
+import com.google.protobuf.TextFormat.ParseException;
+
+public class JsonKeyFinder implements ContentHandler
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-apex-malhar pull request: Apexmalhar 2033 streaming pars...

2016-05-25 Thread devtagare
Github user devtagare commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/288#discussion_r64655706
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/parser/StreamingJsonParser.java 
---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.StringTokenizer;
+
+import org.elasticsearch.common.primitives.Ints;
+import org.json.simple.parser.JSONParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.parser.Parser;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Operator that parses a JSON string tuple and emits a POJO on the output 
port
+ * and tuples that could not be parsed on error port.Upstream operator 
needs to
+ * ensure that a full JSON record is emitted.
+ * Properties
+ * pojoClass:POJO class 
+ * (optional)fieldMappingStringString of format
+ * fieldNameInJson:fieldNameInPOJO:DataType
+ * Ports 
+ * in:input tuple as a String. Each tuple represents a json 
string
+ * out:tuples that are validated as per the user defined POJO are 
emitted
+ * as POJO on this port
+ * err:tuples that could not be parsed are emitted on this port as
+ * KeyValPair<String,String>
+ * Key being the tuple and Val being the reason
+ * 
+ * @displayName SimpleStreamingJsonParser
+ * @category Parsers
+ * @tags json pojo parser streaming
+ */
+@InterfaceStability.Evolving
+public class StreamingJsonParser extends Parser<byte[], KeyValPair<String, 
String>>
+{
+  private String jsonSchema;
+  private transient JSONParser jsonParser;
+  private String fieldMappingString;
+  private List fieldInfos;
+  private List columnFieldSetters;
+  protected JsonKeyFinder finder;
+  private static final String FIELD_SEPARATOR = ":";
+  private static final String RECORD_SEPARATOR = ",";
+  private ArrayList columnFields;
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-apex-malhar pull request: Apexmalhar 2033 streaming pars...

2016-05-25 Thread devtagare
Github user devtagare commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/288#discussion_r64655682
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/parser/StreamingJsonParser.java 
---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.StringTokenizer;
+
+import org.elasticsearch.common.primitives.Ints;
+import org.json.simple.parser.JSONParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.parser.Parser;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Operator that parses a JSON string tuple and emits a POJO on the output 
port
+ * and tuples that could not be parsed on error port.Upstream operator 
needs to
+ * ensure that a full JSON record is emitted.
+ * Properties
+ * pojoClass:POJO class 
+ * (optional)fieldMappingStringString of format
+ * fieldNameInJson:fieldNameInPOJO:DataType
+ * Ports 
+ * in:input tuple as a String. Each tuple represents a json 
string
+ * out:tuples that are validated as per the user defined POJO are 
emitted
+ * as POJO on this port
+ * err:tuples that could not be parsed are emitted on this port as
+ * KeyValPair<String,String>
+ * Key being the tuple and Val being the reason
+ * 
+ * @displayName SimpleStreamingJsonParser
+ * @category Parsers
+ * @tags json pojo parser streaming
+ */
+@InterfaceStability.Evolving
+public class StreamingJsonParser extends Parser<byte[], KeyValPair<String, 
String>>
+{
+  private String jsonSchema;
+  private transient JSONParser jsonParser;
+  private String fieldMappingString;
+  private List fieldInfos;
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-apex-malhar pull request: Apexmalhar 2033 streaming pars...

2016-05-24 Thread devtagare
GitHub user devtagare opened a pull request:

https://github.com/apache/incubator-apex-malhar/pull/288

Apexmalhar 2033 streaming parser

Streaming JSON parser

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/devtagare/incubator-apex-malhar 
APEXMALHAR-2033-StreamingParser

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-apex-malhar/pull/288.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #288


commit b661aa50d71acd93dac6673dd4aa6907b291b88a
Author: devtagare <devtag...@gmail.com>
Date:   2016-05-24T23:52:42Z

APEXMALHAR-2033-StreamingParser JSON streaming parser

commit 962303b0f6c5cb5f9079cd06465ceac1922492a6
Author: devtagare <devtag...@gmail.com>
Date:   2016-05-24T23:58:45Z

APEXMALHAR-2033-StreamingParser JSON streaming parser

commit 673d24bb3dce3f9cb2ce6b450c78818a547bc63f
Author: devtagare <devtag...@gmail.com>
Date:   2016-05-25T00:13:35Z

APEXMALHAR-2033-StreamingParser , checkstyle fix

commit bc01823a8d48db464e7e7a6739952b81be97db8f
Author: devtagare <devtag...@gmail.com>
Date:   2016-05-25T00:17:01Z

APEXMALHAR-2033-StreamingParser streaming JSON parser

commit 4a61de8e02e75b3d9637cf37953f476d7493d81d
Author: devtagare <devtag...@gmail.com>
Date:   2016-05-25T00:26:48Z

APEXMALHAR-2033-StreamingParser streaming JSON parser,checkstyle fix




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---