[jira] [Commented] (APEXMALHAR-2076) AbstractExactlyOnceKafkaOutputOperator didn't handle the orderless of tuples in a window

2016-05-25 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301038#comment-15301038
 ] 

bright chen commented on APEXMALHAR-2076:
-

a new operator, AbstractTupleUniqueExactlyOnceKafkaOutputOperator, was added to 
handle the case of each tuple are unique.

> AbstractExactlyOnceKafkaOutputOperator didn't handle the orderless of tuples 
> in a window
> 
>
> Key: APEXMALHAR-2076
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2076
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: bright chen
>Assignee: bright chen
>
> The order of the tuples in the same window are not guaranteed in replay. 
> AbstractExactlyOnceKafkaOutputOperator's logic assume the replayed tuples 
> have same order.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: IntelliJ and Netbeans code styles are missing

2016-05-25 Thread Vlad Rozov
With the default IntelliJ project structure it is not necessary to 
import apex-style.jar. The code style definition (as well as copyright 
template) are preset using files in .idea folder.


Thank you,
Vlad

On 5/24/16 14:19, Ganelin, Ilya wrote:

I just realized that these were moved to subdirectories but the associated 
documentation was never updated.
I’ll fix that.




On 5/24/16, 2:16 PM, "Ganelin, Ilya"  wrote:


Hi all – I’ve been setting up a new dev environment and noticed that the 
apex-style.jar and apex-style.zip are missing from:
https://github.com/apache/incubator-apex-core/tree/master/misc/ide-templates/intellij
https://github.com/apache/incubator-apex-core/tree/master/misc/ide-templates/netbeans

Did these get lost in the migration from Apache or did we decide to approach 
this a different way?

In recent work I did in the Alluxio project, I created a utility that would 
package/unpack code-style settings allowing changes to be tracked in GitHub but 
still provide a relatively automated process for setting up configurations. 
Would something like that make sense for Apex?

https://github.com/Alluxio/alluxio/pull/3168



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.




[GitHub] incubator-apex-malhar pull request: Apexmalhar 2033 streaming pars...

2016-05-25 Thread devtagare
Github user devtagare commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/288#discussion_r64655954
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/parser/JsonKeyFinder.java ---
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.json.simple.parser.ContentHandler;
+
+import com.google.protobuf.TextFormat.ParseException;
+
+public class JsonKeyFinder implements ContentHandler
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-apex-malhar pull request: Apexmalhar 2033 streaming pars...

2016-05-25 Thread devtagare
Github user devtagare commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/288#discussion_r64655706
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/parser/StreamingJsonParser.java 
---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.StringTokenizer;
+
+import org.elasticsearch.common.primitives.Ints;
+import org.json.simple.parser.JSONParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.parser.Parser;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Operator that parses a JSON string tuple and emits a POJO on the output 
port
+ * and tuples that could not be parsed on error port.Upstream operator 
needs to
+ * ensure that a full JSON record is emitted.
+ * Properties
+ * pojoClass:POJO class 
+ * (optional)fieldMappingStringString of format
+ * fieldNameInJson:fieldNameInPOJO:DataType
+ * Ports 
+ * in:input tuple as a String. Each tuple represents a json 
string
+ * out:tuples that are validated as per the user defined POJO are 
emitted
+ * as POJO on this port
+ * err:tuples that could not be parsed are emitted on this port as
+ * KeyValPair
+ * Key being the tuple and Val being the reason
+ * 
+ * @displayName SimpleStreamingJsonParser
+ * @category Parsers
+ * @tags json pojo parser streaming
+ */
+@InterfaceStability.Evolving
+public class StreamingJsonParser extends Parser>
+{
+  private String jsonSchema;
+  private transient JSONParser jsonParser;
+  private String fieldMappingString;
+  private List fieldInfos;
+  private List columnFieldSetters;
+  protected JsonKeyFinder finder;
+  private static final String FIELD_SEPARATOR = ":";
+  private static final String RECORD_SEPARATOR = ",";
+  private ArrayList columnFields;
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-apex-malhar pull request: Apexmalhar 2033 streaming pars...

2016-05-25 Thread devtagare
Github user devtagare commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/288#discussion_r64655682
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/parser/StreamingJsonParser.java 
---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.StringTokenizer;
+
+import org.elasticsearch.common.primitives.Ints;
+import org.json.simple.parser.JSONParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.parser.Parser;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Operator that parses a JSON string tuple and emits a POJO on the output 
port
+ * and tuples that could not be parsed on error port.Upstream operator 
needs to
+ * ensure that a full JSON record is emitted.
+ * Properties
+ * pojoClass:POJO class 
+ * (optional)fieldMappingStringString of format
+ * fieldNameInJson:fieldNameInPOJO:DataType
+ * Ports 
+ * in:input tuple as a String. Each tuple represents a json 
string
+ * out:tuples that are validated as per the user defined POJO are 
emitted
+ * as POJO on this port
+ * err:tuples that could not be parsed are emitted on this port as
+ * KeyValPair
+ * Key being the tuple and Val being the reason
+ * 
+ * @displayName SimpleStreamingJsonParser
+ * @category Parsers
+ * @tags json pojo parser streaming
+ */
+@InterfaceStability.Evolving
+public class StreamingJsonParser extends Parser>
+{
+  private String jsonSchema;
+  private transient JSONParser jsonParser;
+  private String fieldMappingString;
+  private List fieldInfos;
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (APEXCORE-450) Upgrade the version of xml-doclet

2016-05-25 Thread Sandesh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXCORE-450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sandesh updated APEXCORE-450:
-
Assignee: (was: Sandesh)

> Upgrade the version of xml-doclet
> -
>
> Key: APEXCORE-450
> URL: https://issues.apache.org/jira/browse/APEXCORE-450
> Project: Apache Apex Core
>  Issue Type: Improvement
>Reporter: Sandesh
>Priority: Trivial
>
> xml-doclet is used to extract the javadoc from the classes. But the current 
> version doesn't support extracting the javadoc from the interfaces.
> Note:
> New release of the xml-doclet contains only support for extracting the 
> javadoc from the interfaces and it was fixed by me long time back. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-1966) Cassandra output operator improvements

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300798#comment-15300798
 ] 

ASF GitHub Bot commented on APEXMALHAR-1966:


Github user DT-Priyanka closed the pull request at:

https://github.com/apache/incubator-apex-malhar/pull/227


> Cassandra output operator improvements
> --
>
> Key: APEXMALHAR-1966
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1966
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
>
> Update existing Cassandra output operator to:
> 1. Accept use defined parameterized queries, the queries could be for update, 
> insert or delete.
> 2. Add error port to emit tuples which couldn't be written to database.
> 3. Add metrics
> 4. Provide a way to restrict batch size



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1988: Updating cass...

2016-05-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-apex-malhar/pull/186


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-apex-site pull request: Added new release announcement f...

2016-05-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-apex-site/pull/40


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1957: Added threadi...

2016-05-25 Thread bhupeshchawda
Github user bhupeshchawda closed the pull request at:

https://github.com/apache/incubator-apex-malhar/pull/212


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (APEXMALHAR-1760) MQTT operator improvements

2016-05-25 Thread Sandesh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-1760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sandesh updated APEXMALHAR-1760:

Assignee: (was: Sandesh)

> MQTT operator improvements
> --
>
> Key: APEXMALHAR-1760
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1760
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Sandesh
>Priority: Minor
>
> 1. Mqtt Operator currently not doing ack for messaging which causes the 
> messages to repeat
> 2. Adding Idempotent Manager support to reduce the message loss to 1 one 
> window.
> 3. Making sure that this operator is used only as Atmost once setup



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2087 Hive output mo...

2016-05-25 Thread tweise
Github user tweise commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/289#discussion_r64593672
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/hive/FSPojoToHiveOperator.java ---
@@ -40,14 +43,14 @@
  * @tags fs, hive, database
  * @since 3.0.0
  */
-public class FSPojoToHiveOperator extends 
AbstractFSRollingOutputOperator
+public class FSPojoToHiveOperator extends 
AbstractFSRollingOutputOperator
--- End diff --

What's the purpose of the type parameter?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2087 Hive output mo...

2016-05-25 Thread tweise
Github user tweise commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/289#discussion_r64593874
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/hive/HiveOutputModule.java ---
@@ -0,0 +1,476 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.contrib.hive;
--- End diff --

New classes should be created in appropriate package.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2087 Hive output mo...

2016-05-25 Thread yogidevendra
GitHub user yogidevendra opened a pull request:

https://github.com/apache/incubator-apex-malhar/pull/289

APEXMALHAR-2087 Hive output module

1. Added Hive output module
2. Minor enhancements for getters, setters
3. Fixing some import order, checkstyle violations

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yogidevendra/incubator-apex-malhar 
APEXMALHAR-2087-hive-output-module-PR1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-apex-malhar/pull/289.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #289


commit b4d8ec5c312f81b9fc9ab3abd6e4cb148a1f3e12
Author: yogidevendra 
Date:   2016-05-10T09:58:17Z

APEXMALHAR-2087 Hive output module
1. Added Hive output module
2. Minor enhancements for getters, setters
3. Fixing some import order, checkstyle violations




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2087) Hive output module

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15299935#comment-15299935
 ] 

ASF GitHub Bot commented on APEXMALHAR-2087:


GitHub user yogidevendra opened a pull request:

https://github.com/apache/incubator-apex-malhar/pull/289

APEXMALHAR-2087 Hive output module

1. Added Hive output module
2. Minor enhancements for getters, setters
3. Fixing some import order, checkstyle violations

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yogidevendra/incubator-apex-malhar 
APEXMALHAR-2087-hive-output-module-PR1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-apex-malhar/pull/289.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #289


commit b4d8ec5c312f81b9fc9ab3abd6e4cb148a1f3e12
Author: yogidevendra 
Date:   2016-05-10T09:58:17Z

APEXMALHAR-2087 Hive output module
1. Added Hive output module
2. Minor enhancements for getters, setters
3. Fixing some import order, checkstyle violations




> Hive output module
> --
>
> Key: APEXMALHAR-2087
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2087
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>
> Currently, applications which needs to write to Hive have to include 2 
> operators FSPojoToHiveOperator, HiveOperator from malhar-contrib in the 
> application DAG. [First operator writes to HDFS; second operator loads files 
> from HDFS to hive].
> Proposal: to wrap these two operators into a Module/CompositeOperator. So 
> that, end user can directly include this module into application.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-apex-malhar pull request: Apexmalhar 2033 streaming pars...

2016-05-25 Thread shubham-pathak22
Github user shubham-pathak22 commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/288#discussion_r64560543
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/parser/StreamingJsonParser.java 
---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.StringTokenizer;
+
+import org.elasticsearch.common.primitives.Ints;
+import org.json.simple.parser.JSONParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.parser.Parser;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Operator that parses a JSON string tuple and emits a POJO on the output 
port
+ * and tuples that could not be parsed on error port.Upstream operator 
needs to
+ * ensure that a full JSON record is emitted.
+ * Properties
+ * pojoClass:POJO class 
+ * (optional)fieldMappingStringString of format
+ * fieldNameInJson:fieldNameInPOJO:DataType
+ * Ports 
+ * in:input tuple as a String. Each tuple represents a json 
string
+ * out:tuples that are validated as per the user defined POJO are 
emitted
+ * as POJO on this port
+ * err:tuples that could not be parsed are emitted on this port as
+ * KeyValPair
+ * Key being the tuple and Val being the reason
+ * 
+ * @displayName SimpleStreamingJsonParser
+ * @category Parsers
+ * @tags json pojo parser streaming
+ */
+@InterfaceStability.Evolving
+public class StreamingJsonParser extends Parser>
+{
+  private String jsonSchema;
+  private transient JSONParser jsonParser;
+  private String fieldMappingString;
+  private List fieldInfos;
+  private List columnFieldSetters;
+  protected JsonKeyFinder finder;
+  private static final String FIELD_SEPARATOR = ":";
+  private static final String RECORD_SEPARATOR = ",";
+  private ArrayList columnFields;
+  private transient Class pojoClass;
+
+  /**
+   * @return POJO class
+   */
+  private Class getPojoClass()
+  {
+return pojoClass;
+  }
+
+  /**
+   * Sets the POJO class
+   */
+  public void setPojoClass(Class pojoClass)
--- End diff --

setters and getters for pojoClass is not required. Class should be set 
setup method of the port


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-apex-malhar pull request: Apexmalhar 2033 streaming pars...

2016-05-25 Thread shubham-pathak22
Github user shubham-pathak22 commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/288#discussion_r64559707
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/parser/StreamingJsonParser.java 
---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.StringTokenizer;
+
+import org.elasticsearch.common.primitives.Ints;
+import org.json.simple.parser.JSONParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.parser.Parser;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Operator that parses a JSON string tuple and emits a POJO on the output 
port
+ * and tuples that could not be parsed on error port.Upstream operator 
needs to
+ * ensure that a full JSON record is emitted.
+ * Properties
+ * pojoClass:POJO class 
+ * (optional)fieldMappingStringString of format
+ * fieldNameInJson:fieldNameInPOJO:DataType
+ * Ports 
+ * in:input tuple as a String. Each tuple represents a json 
string
+ * out:tuples that are validated as per the user defined POJO are 
emitted
+ * as POJO on this port
+ * err:tuples that could not be parsed are emitted on this port as
+ * KeyValPair
+ * Key being the tuple and Val being the reason
+ * 
+ * @displayName SimpleStreamingJsonParser
+ * @category Parsers
+ * @tags json pojo parser streaming
+ */
+@InterfaceStability.Evolving
+public class StreamingJsonParser extends Parser>
+{
+  private String jsonSchema;
+  private transient JSONParser jsonParser;
+  private String fieldMappingString;
+  private List fieldInfos;
+  private List columnFieldSetters;
+  protected JsonKeyFinder finder;
+  private static final String FIELD_SEPARATOR = ":";
+  private static final String RECORD_SEPARATOR = ",";
+  private ArrayList columnFields;
+  private transient Class pojoClass;
+
+  /**
+   * @return POJO class
+   */
+  private Class getPojoClass()
+  {
+return pojoClass;
+  }
+
+  /**
+   * Sets the POJO class
+   */
+  public void setPojoClass(Class pojoClass)
+  {
+this.pojoClass = pojoClass;
+  }
+
+  /**
+   * Returns a string representing mapping from generic record to POJO 
fields
+   */
+  public String getFieldMappingString()
+  {
+return fieldMappingString;
+  }
+
+  /**
+   * Comma separated list mapping a field in JSON schema to POJO field eg :
+   * fieldName:INTEGER,fieldName:DOUBLE
+   */
+  public void setFieldMappingString(String 
genericRecordToPOJOFieldsMapping)
+  {
+this.fieldMappingString = genericRecordToPOJOFieldsMapping;
+  }
+
+  public String getJsonSchema()
+  {
+return jsonSchema;
+  }
+
+  public void setJsonSchema(String jsonSchema)
+  {
+this.jsonSchema = jsonSchema;
+  }
+
+  public StreamingJsonParser()
+  {
+
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+jsonParser = new JSONParser();
+

[GitHub] incubator-apex-malhar pull request: Apexmalhar 2033 streaming pars...

2016-05-25 Thread shubham-pathak22
Github user shubham-pathak22 commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/288#discussion_r64557644
  
--- Diff: 
contrib/src/test/java/com/datatorrent/contrib/parser/StreamingJsonParserTest.java
 ---
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.util.List;
+import java.util.ListIterator;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.python.google.common.collect.Lists;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.helper.TestPortContext;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+public class StreamingJsonParserTest
+{
+  public static final String fieldInfoInitMap = "id:id:INTEGER," + 
"name:name:STRING," + "gpa:gpa:DOUBLE";
+
+  public static final String nestedFieldInfoMap = "id:id:INTEGER," + 
"name:name:STRING," + "gpa:gpa:DOUBLE,"
+  + "streetAddress:streetAddress:STRING," + "city:city:STRING," + 
"state:state:STRING,"
+  + "postalCode:postalCode:STRING";
+
+  public static final String invalidFieldInfoMap = "Field1:id:INTEGER," + 
"name:name:STRING," + "gpa:gpa:DOUBLE";
+
+  CollectorTestSink outputSink = new CollectorTestSink();
+  CollectorTestSink errorSink = new CollectorTestSink();
+
+  StreamingJsonParser jsonParser = new StreamingJsonParser();
+
+  private List recordList = null;
+
+  public class TestMeta extends TestWatcher
+  {
+Context.OperatorContext context;
+Context.PortContext portContext;
+
+@Override
+protected void starting(org.junit.runner.Description description)
+{
+  Attribute.AttributeMap portAttributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
+  portAttributes.put(Context.PortContext.TUPLE_CLASS, Person.class);
+  portContext = new TestPortContext(portAttributes);
+  super.starting(description);
+  jsonParser.output.setSink(outputSink);
+  jsonParser.err.setSink(errorSink);
+}
+
+@Override
+protected void finished(Description description)
+{
+  jsonParser.teardown();
+}
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testReads() throws Exception
+  {
+int count = 5;
+createReaderInput(count);
+jsonParser.setPojoClass(Person.class);
+jsonParser.setFieldMappingString(fieldInfoInitMap);
+jsonParser.setup(testMeta.context);
+jsonParser.output.setup(testMeta.portContext);
+
+jsonParser.beginWindow(0);
+ListIterator itr = recordList.listIterator();
+while (itr.hasNext()) {
+  jsonParser.in.process(itr.next().getBytes());
+}
+jsonParser.endWindow();
+
+Assert.assertEquals("Number of tuples", count, 
outputSink.collectedTuples.size());
+Person obj = (Person)outputSink.collectedTuples.get(0);
+Assert.assertEquals("Name is", "name-5", obj.getName());
+jsonParser.teardown();
+  }
+
+  @Test
+  public void testNestedReads() throws Exception
+  {
+int count = 4;
+createReaderInput(count);
+jsonParser.setPojoClass(Person.class);
+jsonParser.setFieldMappingString(nestedFieldInfoMap);
+jsonParser.setup(testMeta.context);
+jsonParser.output.setup(testMeta.portContext);
+
+jsonParser.beginWindow(0);
+ListIterator itr = recordList.listIterator();
+while (itr.hasNext()) {
+  jsonParser.in.process(itr.next().getBytes());
+}
+jsonParser.endWindow();
+Person obj = (Person)outputSink.collectedTuples.get(0);
+Assert.assertEquals("Number of tuples", count, 
outputSink.collectedTuples.size());
+

[GitHub] incubator-apex-malhar pull request: Apexmalhar 2033 streaming pars...

2016-05-25 Thread shubham-pathak22
Github user shubham-pathak22 commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/288#discussion_r64539994
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/parser/StreamingJsonParser.java 
---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.StringTokenizer;
+
+import org.elasticsearch.common.primitives.Ints;
+import org.json.simple.parser.JSONParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.parser.Parser;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Operator that parses a JSON string tuple and emits a POJO on the output 
port
+ * and tuples that could not be parsed on error port.Upstream operator 
needs to
+ * ensure that a full JSON record is emitted.
+ * Properties
+ * pojoClass:POJO class 
+ * (optional)fieldMappingStringString of format
+ * fieldNameInJson:fieldNameInPOJO:DataType
+ * Ports 
+ * in:input tuple as a String. Each tuple represents a json 
string
+ * out:tuples that are validated as per the user defined POJO are 
emitted
+ * as POJO on this port
+ * err:tuples that could not be parsed are emitted on this port as
+ * KeyValPair
+ * Key being the tuple and Val being the reason
+ * 
+ * @displayName SimpleStreamingJsonParser
+ * @category Parsers
+ * @tags json pojo parser streaming
+ */
+@InterfaceStability.Evolving
+public class StreamingJsonParser extends Parser>
+{
+  private String jsonSchema;
+  private transient JSONParser jsonParser;
+  private String fieldMappingString;
+  private List fieldInfos;
+  private List columnFieldSetters;
+  protected JsonKeyFinder finder;
+  private static final String FIELD_SEPARATOR = ":";
+  private static final String RECORD_SEPARATOR = ",";
+  private ArrayList columnFields;
+  private transient Class pojoClass;
+
+  /**
+   * @return POJO class
+   */
+  private Class getPojoClass()
+  {
+return pojoClass;
+  }
+
+  /**
+   * Sets the POJO class
+   */
+  public void setPojoClass(Class pojoClass)
+  {
+this.pojoClass = pojoClass;
+  }
+
+  /**
+   * Returns a string representing mapping from generic record to POJO 
fields
+   */
+  public String getFieldMappingString()
+  {
+return fieldMappingString;
+  }
+
+  /**
+   * Comma separated list mapping a field in JSON schema to POJO field eg :
+   * fieldName:INTEGER,fieldName:DOUBLE
+   */
--- End diff --

E.g needs to be corrected


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---