[GitHub] apex-malhar pull request #642: APEXMALHAR-2497 APEXMALHAR-2162 1) Refactor t...

2017-07-10 Thread chaithu14
GitHub user chaithu14 opened a pull request:

https://github.com/apache/apex-malhar/pull/642

APEXMALHAR-2497 APEXMALHAR-2162 1) Refactor the Exactly Once output 
operator. 2) Refactor and fix the issues of unit tests.

@tushargosavi @tweise Please review and merge.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2497-Kafka10Exactly

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/642.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #642


commit 54565e070c0754c6a86ff1091f33765e40aab9af
Author: chaitanya <chai...@apache.org>
Date:   2017-07-10T06:50:12Z

APEXMALHAR-2497 APEXMALHAR-2162 1) Refactor the Exactly Once output 
operator. 2) Refactor and fix the issues of unit tests.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #623: APEXMALHAR-2494 Added description for the dem...

2017-05-19 Thread chaithu14
GitHub user chaithu14 opened a pull request:

https://github.com/apache/apex-malhar/pull/623

APEXMALHAR-2494 Added description for the demo apps

@yogidevendra Please review and merge.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2494-UpdateDesc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/623.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #623


commit 239361cd7abe585c02b4ef781eaa8320733bc445
Author: chaitanya <chai...@apache.org>
Date:   2017-05-19T08:38:52Z

APEXMALHAR-2494 Added description for the demo apps




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #622: APEXMALHAR-2493 Fixed the issue of KafkaSingl...

2017-05-18 Thread chaithu14
GitHub user chaithu14 opened a pull request:

https://github.com/apache/apex-malhar/pull/622

APEXMALHAR-2493 Fixed the issue of KafkaSinglePortExactlyOnceOutputOperator 
going to the blocked state during recovery

@sandeshh @tushargosavi Please review and merge.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2493-KafkaExactlyCBBug

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/622.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #622


commit c784f4da46d1cf594aa4156135b9c196aa66d931
Author: chaitanya <chai...@apache.org>
Date:   2017-05-18T10:37:52Z

APEXMALHAR-2493 Fixed the issue of KafkaSinglePortExactlyOnceOutputOperator 
going to the blocked state during recovery




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #614: APEXMALHAR-2484 Support of PartFileWriter for...

2017-05-07 Thread chaithu14
Github user chaithu14 closed the pull request at:

https://github.com/apache/apex-malhar/pull/614


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #614: APEXMALHAR-2484 Support of PartFileWriter for...

2017-04-27 Thread chaithu14
GitHub user chaithu14 opened a pull request:

https://github.com/apache/apex-malhar/pull/614

APEXMALHAR-2484 Support of PartFileWriter for writing the part files



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2484-partfilewriter

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/614.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #614


commit d45ea93d8db24af6181d55a5b4e2a13148e4629b
Author: chaitanya <chai...@apache.org>
Date:   2017-04-27T10:00:09Z

APEXMALHAR-2484 Support of PartFileWriter for writing the part files




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #611: APEXMALHAR-2484 Support of PartFileWriter for...

2017-04-27 Thread chaithu14
Github user chaithu14 closed the pull request at:

https://github.com/apache/apex-malhar/pull/611


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #611: APEXMALHAR-2484 Support of PartFileWriter for...

2017-04-26 Thread chaithu14
GitHub user chaithu14 opened a pull request:

https://github.com/apache/apex-malhar/pull/611

APEXMALHAR-2484 Support of PartFileWriter for writing the part files

@shubham-pathak22 @yogidevendra Please review and merge.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2484-partfilewriter

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/611.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #611


commit 6761f7d1efc9ab34076c625cdc0684c8de200151
Author: chaitanya <chai...@apache.org>
Date:   2017-04-26T09:40:52Z

APEXMALHAR-2484 PartFileWriter for writing the part files in specified 
directory




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #600: APEXMALHAR-2459 1)Refactor the existing Kafka...

2017-04-05 Thread chaithu14
GitHub user chaithu14 opened a pull request:

https://github.com/apache/apex-malhar/pull/600

APEXMALHAR-2459 1)Refactor the existing Kafka Input Operator. 2)Added the 
support of KafkaInputOperator using 0.10 consumer API



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2459-Kafkainput

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/600.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #600






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #529: APEXMALHAR-2364 Added Documetation for S3Outp...

2016-12-27 Thread chaithu14
GitHub user chaithu14 opened a pull request:

https://github.com/apache/apex-malhar/pull/529

APEXMALHAR-2364 Added Documetation for S3Output Module

@amberarrow Please review and merge.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2364-S3OutputDoc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/529.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #529


commit a8c8cc293bb67dc80a49f29db271e485cb98567c
Author: chaitanya <chai...@apache.org>
Date:   2016-12-27T10:01:22Z

APEXMALHAR-2364 Added Documetation for S3Output Module




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #483: APEXMALHAR-2022 Developed S3 Output Module

2016-11-30 Thread chaithu14
GitHub user chaithu14 reopened a pull request:

https://github.com/apache/apex-malhar/pull/483

APEXMALHAR-2022 Developed S3 Output Module



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2022-S3Output-multiPart

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/483.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #483


commit a5e8fa3facca750f5d7402c2c29e7cbabe53bd9e
Author: chaitanya <chai...@apache.org>
Date:   2016-11-30T05:17:36Z

APEXMALHAR-2022 Development of S3 Output Module




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #483: APEXMALHAR-2022 Developed S3 Output Module

2016-11-30 Thread chaithu14
Github user chaithu14 closed the pull request at:

https://github.com/apache/apex-malhar/pull/483


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #483: APEXMALHAR-2022 Developed S3 Output Module

2016-11-30 Thread chaithu14
GitHub user chaithu14 reopened a pull request:

https://github.com/apache/apex-malhar/pull/483

APEXMALHAR-2022 Developed S3 Output Module



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2022-S3Output-multiPart

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/483.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #483


commit a5e8fa3facca750f5d7402c2c29e7cbabe53bd9e
Author: chaitanya <chai...@apache.org>
Date:   2016-11-30T05:17:36Z

APEXMALHAR-2022 Development of S3 Output Module




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #483: APEXMALHAR-2022 Developed S3 Output Module

2016-11-30 Thread chaithu14
Github user chaithu14 closed the pull request at:

https://github.com/apache/apex-malhar/pull/483


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #483: APEXMALHAR-2022 Developed S3 Output Module

2016-11-28 Thread chaithu14
GitHub user chaithu14 reopened a pull request:

https://github.com/apache/apex-malhar/pull/483

APEXMALHAR-2022 Developed S3 Output Module



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2022-S3Output-multiPart

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/483.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #483


commit 6ab63bd92dc93ac4ddb3d6ce70d310cfa9322f82
Author: chaitanya <chai...@apache.org>
Date:   2016-11-28T08:54:54Z

APEXMALHAR-2022 Development of S3 Output ModuleAPEXMALHAR-2022 Development 
of S3 Output Module




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #483: APEXMALHAR-2022 Developed S3 Output Module

2016-11-28 Thread chaithu14
Github user chaithu14 closed the pull request at:

https://github.com/apache/apex-malhar/pull/483


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #485: APEXMALHAR-2284 *Review Only*

2016-11-08 Thread chaithu14
GitHub user chaithu14 opened a pull request:

https://github.com/apache/apex-malhar/pull/485

APEXMALHAR-2284 *Review Only* 

1) Implemented SpillableMap, SpillableArrayList, SpillableArrayListMultiMap 
over TimeSlicedBucketState 2) Integrated Spillable data structure into Inner 
Join operator

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2284-SPDOverTime

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/485.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #485


commit 991e5660f52c266e75440fc0b7c03aa3eb4c05fe
Author: chaitanya <chai...@apache.org>
Date:   2016-11-08T12:57:16Z

APEXMALHAR-2284 *Review Only* 1) Implemented SpillableMap, 
SpillableArrayList, SpillableArrayListMultiMap over TimeSlicedBucketState 2) 
Integrated Spillable data structure into Inner Join opeator




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #483: APEXMALHAR-2022 Developed S3 Output Module

2016-11-04 Thread chaithu14
GitHub user chaithu14 opened a pull request:

https://github.com/apache/apex-malhar/pull/483

APEXMALHAR-2022 Developed S3 Output Module



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2022-S3Output-multiPart

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/483.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #483


commit 24fb5638ecb6f0e45edb5d5f640b220ad9372fcc
Author: chaitanya <chai...@apache.org>
Date:   2016-11-04T12:48:21Z

APEXMALHAR-2022 Developed S3 Output Module




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #479: APEXMALHAR-2325 1) Set the file system defaul...

2016-11-02 Thread chaithu14
GitHub user chaithu14 opened a pull request:

https://github.com/apache/apex-malhar/pull/479

APEXMALHAR-2325 1) Set the file system default block size to the reader. 2) 
Set the block size to the reader context



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2325-SameBlockID

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/479.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #479


commit f06e3d8b79edbcdcd6c084e989a6bcbc87e210f9
Author: chaitanya <chai...@apache.org>
Date:   2016-11-02T12:27:43Z

APEXMALHAR-2325 1) Set the file system default block size to the reader. 2) 
Set the block size to the reader context




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #467: APEXMALHAR-2257 Added documentation of Transf...

2016-10-24 Thread chaithu14
GitHub user chaithu14 opened a pull request:

https://github.com/apache/apex-malhar/pull/467

APEXMALHAR-2257 Added documentation of Transform Operator



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2257-transformDoc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/467.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #467


commit 04883eeaff504f62af64dd8419596a72ef636be0
Author: chaitanya <chai...@apache.org>
Date:   2016-10-24T08:08:24Z

APEXMALHAR-2257 Added documentation of Transform Operator




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #453: APEXMALHAR-2284 Disabled the POJOInnerJoinOpe...

2016-10-14 Thread chaithu14
GitHub user chaithu14 opened a pull request:

https://github.com/apache/apex-malhar/pull/453

APEXMALHAR-2284 Disabled the POJOInnerJoinOperatorTest



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2284

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/453.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #453


commit 83ba1a6b5bb3648bf801bb9995de62892e04e1ae
Author: chaitanya <chai...@apache.org>
Date:   2016-10-14T05:50:05Z

APEXMALHAR-2284 Disabled the POJOInnerJoinOperatorTest




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #420: APEXMALHAR-2242 Added user documentation for ...

2016-09-21 Thread chaithu14
GitHub user chaithu14 opened a pull request:

https://github.com/apache/apex-malhar/pull/420

APEXMALHAR-2242 Added user documentation for 0.9 version of Kafka Input 
Operator



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2242-kafka0.9doc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/420.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #420


commit 263c92545d12f6c21625d76e4a18a2ebcae933a8
Author: chaitanya <chai...@apache.org>
Date:   2016-09-21T06:44:50Z

APEXMALHAR-2242 Added user documentation for 0.9 version of Kafka Input 
Operator




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #401: APEXMALHAR-2226 Fixed the Not supported excep...

2016-09-07 Thread chaithu14
GitHub user chaithu14 opened a pull request:

https://github.com/apache/apex-malhar/pull/401

APEXMALHAR-2226 Fixed the Not supported exception while re-deploying the 
AbstractFileOutput Operator



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2226-FS-Append

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/401.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #401


commit ce7e574b211b74eb60065a67f98ec5e99b09e022
Author: chaitanya <chai...@apache.org>
Date:   2016-09-07T09:08:31Z

APEXMALHAR-2226 Fixed the Not supported exception while re-deploying the 
AbstractFileOutput Operator




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #379: APEXMALHAR-2134 Fix the NullPointerException,...

2016-08-26 Thread chaithu14
Github user chaithu14 closed the pull request at:

https://github.com/apache/apex-malhar/pull/379


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #379: APEXMALHAR-2134 Fix the NullPointerException,...

2016-08-26 Thread chaithu14
GitHub user chaithu14 reopened a pull request:

https://github.com/apache/apex-malhar/pull/379

APEXMALHAR-2134 Fix the NullPointerException, if the kafka partition has no 
leader broker



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2134-NP-LB

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/379.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #379


commit 0da1765f4ff12fcda1461f3af06a57450ba8fb4c
Author: chaitanya <chai...@apache.org>
Date:   2016-08-25T19:21:43Z

APEXMALHAR-2134 Fix the NullPointerException, if the kafka partition has no 
leader broker




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #380: APEXMALHAR-2199 Added the support chroot zook...

2016-08-25 Thread chaithu14
GitHub user chaithu14 opened a pull request:

https://github.com/apache/apex-malhar/pull/380

APEXMALHAR-2199 Added the support chroot zookeeper path in Kafka Input 
Operator



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2199-zkPath

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/380.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #380


commit 82a6acbbd4c46e3e940d25366a2ca84c7dea4b8e
Author: chaitanya <chai...@apache.org>
Date:   2016-08-25T18:58:31Z

APEXMALHAR-2199 Added the support chroot zookeeper path in Kafka Input 
Operator




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #379: APEXMALHAR-2134 Fix the NullPointerException,...

2016-08-25 Thread chaithu14
GitHub user chaithu14 opened a pull request:

https://github.com/apache/apex-malhar/pull/379

APEXMALHAR-2134 Fix the NullPointerException, if the kafka partition has no 
leader broker



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2134-NP-LB

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/379.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #379


commit 8b662fb11f0b77373d302c58b4522011806bbfa9
Author: chaitanya <chai...@apache.org>
Date:   2016-08-25T17:41:17Z

APEXMALHAR-2134 Fix the NullPointerException, if the kafka partition has no 
leader broker




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #373: APEXMALHAR-2154 Update the Kafka Input Operat...

2016-08-18 Thread chaithu14
GitHub user chaithu14 opened a pull request:

https://github.com/apache/apex-malhar/pull/373

APEXMALHAR-2154 Update the Kafka Input Operator to use 
CheckpointNotificationListener



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2154

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/373.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #373


commit 9d15fe29287697ff2b660e637c495d76a01ab910
Author: chaitanya <chai...@apache.org>
Date:   2016-08-18T07:23:40Z

APEXMALHAR-2154 Update the Kafka Input Operator to use 
CheckpointNotificationListener




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-18 Thread chaithu14
GitHub user chaithu14 reopened a pull request:

https://github.com/apache/apex-malhar/pull/330

APEXMALHAR-2100 Implementation of Inner Join operator



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2100-InnerJoin

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/330.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #330


commit 755aadae8814887431cd66b55bd1e03591afb5c6
Author: Chaitanya <chaita...@datatorrent.com>
Date:   2016-08-17T10:22:24Z

APEXMALHAR-2100 Implementation of Inner Join operator




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-17 Thread chaithu14
GitHub user chaithu14 reopened a pull request:

https://github.com/apache/apex-malhar/pull/330

APEXMALHAR-2100 Implementation of Inner Join operator



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2100-InnerJoin

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/330.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #330


commit 363287a2cca2a0a5dbb9096b4d971719e71d5342
Author: Chaitanya <chaita...@datatorrent.com>
Date:   2016-08-17T07:08:23Z

APEXMALHAR-2100 Implementation of Inner Join operator




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-17 Thread chaithu14
Github user chaithu14 closed the pull request at:

https://github.com/apache/apex-malhar/pull/330


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-11 Thread chaithu14
Github user chaithu14 closed the pull request at:

https://github.com/apache/apex-malhar/pull/330


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-11 Thread chaithu14
GitHub user chaithu14 reopened a pull request:

https://github.com/apache/apex-malhar/pull/330

APEXMALHAR-2100 Implementation of Inner Join operator



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2100-InnerJoin

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/330.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #330


commit 22e165c2cc8710c4a5135d64cb6a684cf0866c07
Author: Chaitanya <chaita...@datatorrent.com>
Date:   2016-08-10T14:01:28Z

APEXMALHAR-2100 Implementation of Inner Join operator




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #360: APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 r...

2016-08-08 Thread chaithu14
Github user chaithu14 closed the pull request at:

https://github.com/apache/apex-malhar/pull/360


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #360: APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 r...

2016-08-08 Thread chaithu14
GitHub user chaithu14 reopened a pull request:

https://github.com/apache/apex-malhar/pull/360

APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 reader issue



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2174-S3-ReaderIssue

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/360.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #360


commit 0c70e92e6f2a1a631569d6b9608ac79de0a50b96
Author: Chaitanya <chaita...@datatorrent.com>
Date:   2016-08-08T06:21:11Z

APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 reader issue




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-08 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844379
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
 ---
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Concrete implementation of SpillableByteArrayListMultimap which is 
needed for join operator.
+ *
+ * Properties:
+ * isKeyContainsMultiValue: Specifies whether the key has multiple 
value or not. 
+ * timeBucket: Specifies the lenght of the time bucket.
+ *
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class ManagedTimeStateMultiValue<K,V> implements 
Spillable.SpillableByteArrayListMultimap<K,V>
+{
+  private transient StreamCodec streamCodec = null;
+  private boolean isKeyContainsMultiValue = false;
+  private long timeBucket;
+  @NotNull
+  private ManagedTimeStateImpl store;
+
+  public ManagedTimeStateMultiValue()
+  {
+if (streamCodec == null) {
+  streamCodec = new KryoSerializableStreamCodec();
+}
+  }
+
+  public ManagedTimeStateMultiValue(@NotNull ManagedTimeStateImpl store, 
boolean isKeyContainsMultiValue)
+  {
+this();
+this.store = Preconditions.checkNotNull(store);
+this.isKeyContainsMultiValue = isKeyContainsMultiValue;
+  }
+
+  /**
+   * Return the list of values from the store
+   * @param k given key
+   * @return list of values
+   */
+  @Override
+  public List get(@Nullable K k)
+  {
+List value = null;
+Slice valueSlice = store.getSync(getBucketId(k), 
streamCodec.toByteArray(k));
+if (valueSlice == null || valueSlice.length == 0 || valueSlice.buffer 
== null) {
+  return null;
+}
+if (isKeyContainsMultiValue) {
+  return (List)streamCodec.fromByteArray(valueSlice);
+}
+value = new ArrayList<>();
+value.add((V)streamCodec.fromByteArray(valueSlice));
+return  value;
+  }
+
+  /**
+   * Returns the Future form the store.
+   * @param k given key
+   * @return
+   */
+  public CompositeFuture getAsync(@Nullable K k)
+  {
+return new CompositeFuture(store.getAsync(getBucketId(k), 
streamCodec.toByteArray(k)));
+  }
+
+  @Override
+  public Set keySet()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Multiset keys()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection values()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection<Map.Entry<K, V>> entries()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List removeAll(@Nullable Object o)
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void clear()
+  {
+
+  }
+
+  @Override
  

[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-08 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844212
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java 
---
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * 
+ * An abstract implementation of inner join operator. Operator receives 
tuples from two streams,
+ * applies the join operation based on constraint and emit the joined 
value.
+ * Concrete classes should provide implementation to extractKey, 
extractTime, mergeTuples methods.
+ *
+ * Properties:
+ * includeFieldStr: List of comma separated fields to be added to 
the output tuple.
+ * Ex: Field1,Field2;Field3,Field4
+ * keyFields: List of comma separated key field for both the 
streams. Ex: Field1,Field2
+ * timeFields: List of comma separated time field for both the 
streams. Ex: Field1,Field2
+ * expiryTime: Expiry time for stored tuples
+ * isStream1KeyPrimary: : Specifies whether the stream1 key is 
primary or not
+ * isStream2KeyPrimary: : Specifies whether the stream2 key is 
primary or not
+ *
+ *  Example:  
+ *  Left input port receives customer details and right input port 
receives Order details.
+ *  Schema for the Customer be in the form of {ID, Name, CTime}
+ *  Schema for the Order be in the form of {OID, CID, OTime}
+ *  Now, Join the tuples of Customer and Order streams where Customer.ID = 
Order.CID and the constraint is
+ *  matched tuples must have timestamp within 5 minutes.
+ *  Here, key Fields = ID, CID and Time Fields = CTime, OTime, expiryTime 
= 5 minutes  
+ *
+ *  @displayName Abstract Inner Join Operator
+ *  @tags join
+ */
+public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator
+{
+  protected transient String[][] includeFields;
+  protected transient List keyFields;
+  protected transient List timeFields;
+  @AutoMetric
+  private long tuplesJoinedPerSec;
+  private double windowTimeSec;
+  private int tuplesCount;
+  @NotNull
+  private String keyFieldsStr;
+  @NotNull
+  private String includeFieldStr;
+  private String timeFieldsStr;
+  private Long stream1ExpiryTime;
+  private Long stream2ExpiryTime;
+  private boolean isStream1KeyPrimary = true;
+  private boolean isStream2KeyPrimary = true;
+  protected SpillableComplexComponent component;
+  protected Spillable.SpillableByteArrayListMultimap<K,T> stream1Data;
+  protected Spillable.SpillableByteArrayListMultimap<K,T> stream2Data;
+
+  /**
+   * Process the tuple which are received from input ports with the 
following steps:
+   * 1) Extract key from the given tuple
+   * 2) Insert <key,tuple> into the store where store is the stream1Data 
if the tuple
+   * receives from stream1 or viceversa.
+   * 3) Get the values of the key if found it in opposite store
+   * 4) Merge the given tuple and values found from step (3)
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to 
stream1 or not.
+   */
+  protected void processTuple(T tuple, boolean isStream1Data)
+  {
+Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? 
stream1Data : stream2Data;
+K key = extractKey

[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-08 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844311
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and 
receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class POJOInnerJoinOperator extends 
AbstractManagedStateInnerJoinOperator<Object,Object> implements 
Operator.ActivationListener
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(POJOInnerJoinOperator.class);
+  private transient long timeIncrement;
+  private transient FieldObjectMap[] inputFieldObjects = 
(FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2);
+  protected transient Class outputClass;
+  private long time = System.currentTimeMillis();
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort outputPort = new 
DefaultOutputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input1 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[0].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,true);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(true);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input2 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[1].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,false);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(false);
+}
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+timeIncrement = 
context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+  context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+super.setup(context);
+for (int i = 0; i < 2; i++) {
+  inputFieldObjects[i] = new FieldObjectMap();
+}
+  }
+
+  /**
+   * Extract the time value from the given tuple
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to 
stream1 or not.
+   * @return
+   */
+  @Override
+  public long extractTime(Object tuple, boolean isStream1Data)
+  {
+ret

[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-08 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844328
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and 
receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class POJOInnerJoinOperator extends 
AbstractManagedStateInnerJoinOperator<Object,Object> implements 
Operator.ActivationListener
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(POJOInnerJoinOperator.class);
+  private transient long timeIncrement;
+  private transient FieldObjectMap[] inputFieldObjects = 
(FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2);
+  protected transient Class outputClass;
+  private long time = System.currentTimeMillis();
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort outputPort = new 
DefaultOutputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input1 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[0].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,true);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(true);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input2 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[1].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,false);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(false);
+}
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+timeIncrement = 
context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+  context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+super.setup(context);
+for (int i = 0; i < 2; i++) {
+  inputFieldObjects[i] = new FieldObjectMap();
+}
+  }
+
+  /**
+   * Extract the time value from the given tuple
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to 
stream1 or not.
+   * @return
+   */
+  @Override
+  public long extractTime(Object tuple, boolean isStream1Data)
+  {
+ret

[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-08 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844275
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/InnerJoinStreamCodec.java ---
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Stream codec based on keyExpression for POJO Inner Join Operator.
+ *
+ * @Since 3.5.0
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-08 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844228
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java
 ---
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue;
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.hadoop.fs.Path;
+import com.google.common.collect.Maps;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * An abstract implementation of inner join operator over Managed state 
which extends from
+ * AbstractInnerJoinOperator.
+ *
+ * Properties:
+ * noOfBuckets: Number of buckets required for Managed state. 
+ * bucketSpanTime: Indicates the length of the time bucket. 
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends 
AbstractInnerJoinOperator<K,T> implements
+Operator.CheckpointNotificationListener, 
Operator.CheckpointListener,Operator.IdleTimeHandler
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-08 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844300
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and 
receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class POJOInnerJoinOperator extends 
AbstractManagedStateInnerJoinOperator<Object,Object> implements 
Operator.ActivationListener
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(POJOInnerJoinOperator.class);
+  private transient long timeIncrement;
+  private transient FieldObjectMap[] inputFieldObjects = 
(FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2);
+  protected transient Class outputClass;
+  private long time = System.currentTimeMillis();
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort outputPort = new 
DefaultOutputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input1 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[0].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,true);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(true);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input2 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[1].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,false);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(false);
+}
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+timeIncrement = 
context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+  context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+super.setup(context);
+for (int i = 0; i < 2; i++) {
+  inputFieldObjects[i] = new FieldObjectMap();
+}
+  }
+
+  /**
+   * Extract the time value from the given tuple
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to 
stream1 or not.
+   * @return
+   */
+  @Override
+  public long extractTime(Object tuple, boolean isStream1Data)
+  {
+ret

[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-08 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844285
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and 
receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class POJOInnerJoinOperator extends 
AbstractManagedStateInnerJoinOperator<Object,Object> implements 
Operator.ActivationListener
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(POJOInnerJoinOperator.class);
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-08 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844252
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java
 ---
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue;
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.hadoop.fs.Path;
+import com.google.common.collect.Maps;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * An abstract implementation of inner join operator over Managed state 
which extends from
+ * AbstractInnerJoinOperator.
+ *
+ * Properties:
+ * noOfBuckets: Number of buckets required for Managed state. 
+ * bucketSpanTime: Indicates the length of the time bucket. 
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends 
AbstractInnerJoinOperator<K,T> implements
+Operator.CheckpointNotificationListener, 
Operator.CheckpointListener,Operator.IdleTimeHandler
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class);
+  public static final String stateDir = "managedState";
+  public static final String stream1State = "stream1Data";
+  public static final String stream2State = "stream2Data";
+  private transient Map<JoinEvent<K,T>, Future> waitingEvents = 
Maps.newLinkedHashMap();
+  private int noOfBuckets = 1;
+  private Long bucketSpanTime;
+  protected ManagedTimeStateImpl stream1Store;
+  protected ManagedTimeStateImpl stream2Store;
+
+  /**
+   * Create Managed states and stores for both the streams.
+   */
+  @Override
+  public void createStores()
+  {
+stream1Store = new ManagedTimeStateImpl();
+stream2Store = new ManagedTimeStateImpl();
+stream1Store.setNumBuckets(noOfBuckets);
+stream2Store.setNumBuckets(noOfBuckets);
+if (bucketSpanTime != null) {
+  
stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
+  
stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
+}
+
stream1Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime()));
+
stream2Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime()));
+
+component = new ManagedSpillableComplexComponent();
+stream1Data = 
((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream1Store,
 !isStream1KeyPrimary());
+stream2Data = 
((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream2Store,
 !isStream2KeyPrimary());
+  }
+
+  /**
+   * Process the tuple which are received from input ports with the 
following steps:
+   * 1) Extract key from the given tuple
+   * 2) Insert <key,tuple> into the store where store is the stream1Data 
if the tuple
+   * receives from stream1 or viceversa.
+   * 3) Get the values of the key in asynchronous if found it in opposite 
stor

[GitHub] apex-malhar pull request #351: APEXMALHAR-2169 Fixed the issue of dynamic pa...

2016-08-07 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/351#discussion_r73824988
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
 ---
@@ -188,6 +188,8 @@
   @Min(1)
   private int initialPartitionCount = 1;
 
+  private boolean isPartitionBasedOnLoad = false;
--- End diff --

Is this can be achieved as follows. Please correct it, if I am wrong.
if (msgRateUpperBound != Long.MAX_VALUE) {
/// Dynamic partition based on load is enabled
} else {
/// Dynamic partition based on load is disabled
}



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #351: APEXMALHAR-2169 Fixed the issue of dynamic pa...

2016-08-07 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/351#discussion_r73824992
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
 ---
@@ -834,17 +840,16 @@ private boolean isPartitionRequired(int opid, 
List

[GitHub] apex-malhar pull request #360: APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 r...

2016-08-03 Thread chaithu14
GitHub user chaithu14 opened a pull request:

https://github.com/apache/apex-malhar/pull/360

APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 reader issue



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2174-S3-ReaderIssue

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/360.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #360


commit 01b0f42d1d0ab2e6030e390e10e1dafba72f3302
Author: Chaitanya <chaita...@datatorrent.com>
Date:   2016-08-03T10:13:30Z

APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 reader issue




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-03 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73290027
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
 ---
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Concrete implementation of SpillableByteArrayListMultimap which is 
needed for join operator.
+ *
+ * Properties:
+ * isKeyContainsMultiValue: Specifies whether the key has multiple 
value or not. 
+ * timeBucket: Specifies the lenght of the time bucket.
+ *
+ */
+public class ManagedTimeStateMultiValue<K,V> implements 
Spillable.SpillableByteArrayListMultimap<K,V>
+{
+  private transient StreamCodec streamCodec = null;
+  private boolean isKeyContainsMultiValue = false;
+  private long timeBucket;
+  @NotNull
+  private ManagedTimeStateImpl store;
+
+  public ManagedTimeStateMultiValue()
+  {
+if (streamCodec == null) {
+  streamCodec = new KryoSerializableStreamCodec();
+}
+  }
+
+  public ManagedTimeStateMultiValue(@NotNull ManagedTimeStateImpl store, 
boolean isPrimaryKey)
+  {
+this();
+this.store = Preconditions.checkNotNull(store);
+this.isKeyContainsMultiValue = isPrimaryKey;
+  }
+
+  /**
+   * Return the list of values from the store
+   * @param k given key
+   * @return list of values
+   */
+  @Override
+  public List get(@Nullable K k)
+  {
+List value = null;
+Slice valueSlice = store.getSync(getBucketId(k), 
streamCodec.toByteArray(k));
+if (isKeyContainsMultiValue) {
+  value = (List)streamCodec.fromByteArray(valueSlice);
+}  else {
+  if (valueSlice == null || valueSlice.length == 0 || 
valueSlice.buffer == null) {
+return null;
+  }
+  value = new ArrayList<>();
+  value.add((V)streamCodec.fromByteArray(valueSlice));
+}
+return  value;
+  }
+
+  /**
+   * Returns the Future form the store.
+   * @param k given key
+   * @return
+   */
+  public CompositeFuture getAsync(@Nullable K k)
+  {
+return new CompositeFuture(store.getAsync(getBucketId(k), 
streamCodec.toByteArray(k)));
+  }
+
+  @Override
+  public Set keySet()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Multiset keys()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection values()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection<Map.Entry<K, V>> entries()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List removeAll(@Nullable Object o)
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void clear()
+  {
+
+  }
+
+  @Override
+  public int size()
+  {
+throw new UnsupportedOpe

[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-03 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73289926
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java ---
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and 
receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+public class POJOInnerJoinOperator extends 
AbstractManagedStateInnerJoinOperator<Object,Object> implements 
Operator.ActivationListener
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(POJOInnerJoinOperator.class);
+  private transient long timeIncrement;
+  private transient FieldObjectMap[] inputFieldObjects = 
(FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2);
+  protected Class outputClass;
+  private long time = System.currentTimeMillis();
--- End diff --

Yes. It will restore back from the state when it crash and come back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-03 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73289947
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java ---
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and 
receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+public class POJOInnerJoinOperator extends 
AbstractManagedStateInnerJoinOperator<Object,Object> implements 
Operator.ActivationListener
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(POJOInnerJoinOperator.class);
+  private transient long timeIncrement;
+  private transient FieldObjectMap[] inputFieldObjects = 
(FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2);
+  protected Class outputClass;
+  private long time = System.currentTimeMillis();
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort outputPort = new 
DefaultOutputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input1 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[0].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,true);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(true);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input2 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[1].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,false);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(false);
+}
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+timeIncrement = 
context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+  context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+super.setup(context);
+for (int i = 0; i < 2; i++) {
+  inputFieldObjects[i] = new FieldObjectMap();
+}
+  }
+
+  /**
+   * Extract the time value from the given tuple
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to 
stream1 or not.
+   * @return
+   */
+  @Override
+  public long extractTime(Object tuple, boolean isStream1Data)
+  {
+return timeFields == null ? time : (long)(isStream1Data ? 
inputFieldObje

[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-03 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73289992
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
 ---
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Concrete implementation of SpillableByteArrayListMultimap which is 
needed for join operator.
+ *
+ * Properties:
+ * isKeyContainsMultiValue: Specifies whether the key has multiple 
value or not. 
+ * timeBucket: Specifies the lenght of the time bucket.
+ *
+ */
+public class ManagedTimeStateMultiValue<K,V> implements 
Spillable.SpillableByteArrayListMultimap<K,V>
+{
+  private transient StreamCodec streamCodec = null;
+  private boolean isKeyContainsMultiValue = false;
+  private long timeBucket;
+  @NotNull
+  private ManagedTimeStateImpl store;
+
+  public ManagedTimeStateMultiValue()
+  {
+if (streamCodec == null) {
+  streamCodec = new KryoSerializableStreamCodec();
+}
+  }
+
+  public ManagedTimeStateMultiValue(@NotNull ManagedTimeStateImpl store, 
boolean isPrimaryKey)
+  {
+this();
+this.store = Preconditions.checkNotNull(store);
+this.isKeyContainsMultiValue = isPrimaryKey;
+  }
+
+  /**
+   * Return the list of values from the store
+   * @param k given key
+   * @return list of values
+   */
+  @Override
+  public List get(@Nullable K k)
+  {
+List value = null;
+Slice valueSlice = store.getSync(getBucketId(k), 
streamCodec.toByteArray(k));
+if (isKeyContainsMultiValue) {
+  value = (List)streamCodec.fromByteArray(valueSlice);
+}  else {
+  if (valueSlice == null || valueSlice.length == 0 || 
valueSlice.buffer == null) {
--- End diff --

Yes. Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-03 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73289869
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/ManagedSpillableComplexComponent.java
 ---
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * SpillableComplexComponent for Join Operator
+ */
+public class ManagedSpillableComplexComponent implements 
SpillableComplexComponent
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-03 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73289943
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java ---
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and 
receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+public class POJOInnerJoinOperator extends 
AbstractManagedStateInnerJoinOperator<Object,Object> implements 
Operator.ActivationListener
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(POJOInnerJoinOperator.class);
+  private transient long timeIncrement;
+  private transient FieldObjectMap[] inputFieldObjects = 
(FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2);
+  protected Class outputClass;
+  private long time = System.currentTimeMillis();
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort outputPort = new 
DefaultOutputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input1 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[0].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,true);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(true);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input2 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[1].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,false);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(false);
+}
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+timeIncrement = 
context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+  context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+super.setup(context);
+for (int i = 0; i < 2; i++) {
+  inputFieldObjects[i] = new FieldObjectMap();
+}
+  }
+
+  /**
+   * Extract the time value from the given tuple
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to 
stream1 or not.
+   * @return
+   */
+  @Override
+  public long extractTime(Object tuple, boolean isStream1Data)
+  {
+return timeFields == null ? time : (long)(isStream1Data ? 
inputFieldObje

[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-03 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73289857
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java
 ---
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue;
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import org.apache.hadoop.fs.Path;
+import com.google.common.collect.Maps;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+
+/**
+ * An abstract implementation of inner join operator over Managed state 
which extends from
+ * AbstractInnerJoinOperator.
+ *
+ * Properties:
+ * noOfBuckets: Number of buckets required for Managed state. 
+ * bucketSpanTime: Indicates the length of the time bucket. 
+ */
+public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends 
AbstractInnerJoinOperator<K,T> implements
+Operator.CheckpointNotificationListener, 
Operator.CheckpointListener,Operator.IdleTimeHandler
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class);
+  public static final String stateDir = "managedState";
+  public static final String stream1State = "stream1Data";
+  public static final String stream2State = "stream2Data";
+  private transient long sleepMillis;
+  private transient Map<JoinEvent<K,T>, Future> waitingEvents = 
Maps.newLinkedHashMap();
+  private int noOfBuckets = 1;
+  private Long bucketSpanTime;
+  protected ManagedTimeStateImpl stream1Store;
+  protected ManagedTimeStateImpl stream2Store;
+
+  /**
+   * Create Managed states and stores for both the streams.
+   */
+  @Override
+  public void createStores()
+  {
+stream1Store = new ManagedTimeStateImpl();
+stream2Store = new ManagedTimeStateImpl();
+stream1Store.setNumBuckets(noOfBuckets);
+stream2Store.setNumBuckets(noOfBuckets);
+if (bucketSpanTime != null) {
+  
stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
+  
stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
+}
+
+if (getStream1ExpiryTime() != null) {
+  
stream1Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getStream1ExpiryTime()));
+}
+if (getStream2ExpiryTime() != null) {
+  
stream2Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getStream2ExpiryTime()));
+}
+
+component = new ManagedSpillableComplexComponent();
+stream1Data = 
((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream1Store,
 !isStream1KeyPrimary());
+stream2Data = 
((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream2Store,
 !isStream2KeyPrimary());
+  }
+
+  /**
+   * Process the tuple which are received from input ports with the 
following steps:
+   * 1) Extract key from the given tuple
+   * 2) Insert <key,tuple> into the store where store is the stream1Data 
if the tuple
+   * receives from stream1 or viceversa.
+   * 3) Get the values of the key in asynchronous if found it in opposite 
store
+   * 4) If the future is done then Merge the given tupl

[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-03 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73289845
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java
 ---
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue;
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import org.apache.hadoop.fs.Path;
+import com.google.common.collect.Maps;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+
+/**
+ * An abstract implementation of inner join operator over Managed state 
which extends from
+ * AbstractInnerJoinOperator.
+ *
+ * Properties:
+ * noOfBuckets: Number of buckets required for Managed state. 
+ * bucketSpanTime: Indicates the length of the time bucket. 
+ */
+public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends 
AbstractInnerJoinOperator<K,T> implements
+Operator.CheckpointNotificationListener, 
Operator.CheckpointListener,Operator.IdleTimeHandler
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class);
+  public static final String stateDir = "managedState";
+  public static final String stream1State = "stream1Data";
+  public static final String stream2State = "stream2Data";
+  private transient long sleepMillis;
+  private transient Map<JoinEvent<K,T>, Future> waitingEvents = 
Maps.newLinkedHashMap();
+  private int noOfBuckets = 1;
+  private Long bucketSpanTime;
+  protected ManagedTimeStateImpl stream1Store;
+  protected ManagedTimeStateImpl stream2Store;
+
+  /**
+   * Create Managed states and stores for both the streams.
+   */
+  @Override
+  public void createStores()
+  {
+stream1Store = new ManagedTimeStateImpl();
+stream2Store = new ManagedTimeStateImpl();
+stream1Store.setNumBuckets(noOfBuckets);
+stream2Store.setNumBuckets(noOfBuckets);
+if (bucketSpanTime != null) {
+  
stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
+  
stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
+}
+
+if (getStream1ExpiryTime() != null) {
+  
stream1Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getStream1ExpiryTime()));
+}
+if (getStream2ExpiryTime() != null) {
+  
stream2Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getStream2ExpiryTime()));
+}
+
+component = new ManagedSpillableComplexComponent();
+stream1Data = 
((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream1Store,
 !isStream1KeyPrimary());
+stream2Data = 
((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream2Store,
 !isStream2KeyPrimary());
+  }
+
+  /**
+   * Process the tuple which are received from input ports with the 
following steps:
+   * 1) Extract key from the given tuple
+   * 2) Insert <key,tuple> into the store where store is the stream1Data 
if the tuple
+   * receives from stream1 or viceversa.
+   * 3) Get the values of the key in asynchronous if found it in opposite 
store
+   * 4) If the future is done then Merge the given tupl

[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-03 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73289826
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java 
---
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * 
+ * An abstract implementation of inner join operator. Operator receives 
tuples from two streams,
+ * applies the join operation based on constraint and emit the joined 
value.
+ * Concrete classes should provide implementation to extractKey, 
extractTime, mergeTuples methods.
+ *
+ * Properties:
+ * includeFieldStr: List of comma separated fields to be added to 
the output tuple.
+ * Ex: Field1,Field2;Field3,Field4
+ * keyFields: List of comma separated key field for both the 
streams. Ex: Field1,Field2
+ * timeFields: List of comma separated time field for both the 
streams. Ex: Field1,Field2
+ * expiryTime: Expiry time for stored tuples
+ * isStream1KeyPrimary: : Specifies whether the stream1 key is 
primary or not
+ * isStream2KeyPrimary: : Specifies whether the stream2 key is 
primary or not
+ *
+ *  Example:  
+ *  Left input port receives customer details and right input port 
receives Order details.
+ *  Schema for the Customer be in the form of {ID, Name, CTime}
+ *  Schema for the Order be in the form of {OID, CID, OTime}
+ *  Now, Join the tuples of Customer and Order streams where Customer.ID = 
Order.CID and the constraint is
+ *  matched tuples must have timestamp within 5 minutes.
+ *  Here, key Fields = ID, CID and Time Fields = CTime, OTime, expiryTime 
= 5 minutes  
+ *
+ *  @displayName Abstract Inner Join Operator
+ *  @tags join
+ */
+public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator
+{
+  protected transient String[][] includeFields;
+  protected transient List keyFields;
+  protected transient List timeFields;
+  @AutoMetric
+  private long tuplesJoinedPerSec;
+  private double windowTimeSec;
+  private int tuplesCount;
+  @NotNull
+  private String keyFieldsStr;
+  @NotNull
+  private String includeFieldStr;
+  private String timeFieldsStr;
+  private Long stream1ExpiryTime;
+  private Long stream2ExpiryTime;
+  private boolean isStream1KeyPrimary = true;
+  private boolean isStream2KeyPrimary = true;
+  protected SpillableComplexComponent component;
+  protected Spillable.SpillableByteArrayListMultimap<K,T> stream1Data;
+  protected Spillable.SpillableByteArrayListMultimap<K,T> stream2Data;
+
+  /**
+   * Process the tuple which are received from input ports with the 
following steps:
+   * 1) Extract key from the given tuple
+   * 2) Insert <key,tuple> into the store where store is the stream1Data 
if the tuple
+   * receives from stream1 or viceversa.
+   * 3) Get the values of the key if found it in opposite store
+   * 4) Merge the given tuple and values found from step (3)
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to 
stream1 or not.
+   */
+  protected void processTuple(T tuple, boolean isStream1Data)
+  {
+Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? 
stream1Data : stream2Data;
+K key = extractKey

[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-02 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73282037
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java ---
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and 
receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+public class POJOInnerJoinOperator extends 
AbstractManagedStateInnerJoinOperator<Object,Object> implements 
Operator.ActivationListener
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(POJOInnerJoinOperator.class);
+  private transient long timeIncrement;
+  private transient FieldObjectMap[] inputFieldObjects = 
(FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2);
+  protected Class outputClass;
+  private long time = System.currentTimeMillis();
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort outputPort = new 
DefaultOutputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input1 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[0].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,true);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(true);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input2 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[1].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,false);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(false);
+}
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+timeIncrement = 
context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+  context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+super.setup(context);
+for (int i = 0; i < 2; i++) {
+  inputFieldObjects[i] = new FieldObjectMap();
+}
+  }
+
+  /**
+   * Extract the time value from the given tuple
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to 
stream1 or not.
+   * @return
+   */
+  @Override
+  public long extractTime(Object tuple, boolean isStream1Data)
+  {
+return timeFields == null ? time : (long)(isStream1Data ? 
inputFieldObje

[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-02 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73282003
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
 ---
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Concrete implementation of SpillableByteArrayListMultimap which is 
needed for join operator.
+ *
+ * Properties:
+ * isKeyContainsMultiValue: Specifies whether the key has multiple 
value or not. 
+ * timeBucket: Specifies the lenght of the time bucket.
+ *
+ */
+public class ManagedTimeStateMultiValue<K,V> implements 
Spillable.SpillableByteArrayListMultimap<K,V>
+{
+  private transient StreamCodec streamCodec = null;
+  private boolean isKeyContainsMultiValue = false;
+  private long timeBucket;
+  @NotNull
+  private ManagedTimeStateImpl store;
+
+  public ManagedTimeStateMultiValue()
+  {
+if (streamCodec == null) {
+  streamCodec = new KryoSerializableStreamCodec();
+}
+  }
+
+  public ManagedTimeStateMultiValue(@NotNull ManagedTimeStateImpl store, 
boolean isPrimaryKey)
+  {
+this();
+this.store = Preconditions.checkNotNull(store);
+this.isKeyContainsMultiValue = isPrimaryKey;
+  }
+
+  /**
+   * Return the list of values from the store
+   * @param k given key
+   * @return list of values
+   */
+  @Override
+  public List get(@Nullable K k)
+  {
+List value = null;
+Slice valueSlice = store.getSync(getBucketId(k), 
streamCodec.toByteArray(k));
+if (isKeyContainsMultiValue) {
+  value = (List)streamCodec.fromByteArray(valueSlice);
+}  else {
+  if (valueSlice == null || valueSlice.length == 0 || 
valueSlice.buffer == null) {
+return null;
+  }
+  value = new ArrayList<>();
+  value.add((V)streamCodec.fromByteArray(valueSlice));
+}
+return  value;
+  }
+
+  /**
+   * Returns the Future form the store.
+   * @param k given key
+   * @return
+   */
+  public CompositeFuture getAsync(@Nullable K k)
+  {
+return new CompositeFuture(store.getAsync(getBucketId(k), 
streamCodec.toByteArray(k)));
+  }
+
+  @Override
+  public Set keySet()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Multiset keys()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection values()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection<Map.Entry<K, V>> entries()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List removeAll(@Nullable Object o)
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void clear()
+  {
+
+  }
+
+  @Override
+  public int size()
+  {
+throw new UnsupportedOpe

[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-02 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73281797
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java
 ---
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue;
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import org.apache.hadoop.fs.Path;
+import com.google.common.collect.Maps;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+
+/**
+ * An abstract implementation of inner join operator over Managed state 
which extends from
+ * AbstractInnerJoinOperator.
+ *
+ * Properties:
+ * noOfBuckets: Number of buckets required for Managed state. 
+ * bucketSpanTime: Indicates the length of the time bucket. 
+ */
+public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends 
AbstractInnerJoinOperator<K,T> implements
+Operator.CheckpointNotificationListener, 
Operator.CheckpointListener,Operator.IdleTimeHandler
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class);
+  public static final String stateDir = "managedState";
+  public static final String stream1State = "stream1Data";
+  public static final String stream2State = "stream2Data";
+  private transient long sleepMillis;
+  private transient Map<JoinEvent<K,T>, Future> waitingEvents = 
Maps.newLinkedHashMap();
+  private int noOfBuckets = 1;
+  private Long bucketSpanTime;
+  protected ManagedTimeStateImpl stream1Store;
+  protected ManagedTimeStateImpl stream2Store;
+
+  /**
+   * Create Managed states and stores for both the streams.
+   */
+  @Override
+  public void createStores()
+  {
+stream1Store = new ManagedTimeStateImpl();
+stream2Store = new ManagedTimeStateImpl();
+stream1Store.setNumBuckets(noOfBuckets);
+stream2Store.setNumBuckets(noOfBuckets);
+if (bucketSpanTime != null) {
--- End diff --

Ok. Will make expiryTime as mandatory. 
I thought of different use case and this is invalid in streaming scenario. 
I will make single expiry time for both the streams.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-02 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73281761
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java ---
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and 
receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+public class POJOInnerJoinOperator extends 
AbstractManagedStateInnerJoinOperator<Object,Object> implements 
Operator.ActivationListener
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(POJOInnerJoinOperator.class);
+  private transient long timeIncrement;
+  private transient FieldObjectMap[] inputFieldObjects = 
(FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2);
+  protected Class outputClass;
+  private long time = System.currentTimeMillis();
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort outputPort = new 
DefaultOutputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input1 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[0].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,true);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(true);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input2 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[1].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,false);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(false);
+}
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+timeIncrement = 
context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+  context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+super.setup(context);
+for (int i = 0; i < 2; i++) {
+  inputFieldObjects[i] = new FieldObjectMap();
--- End diff --

No. In declaration, it creates the array with Map type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-02 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73145124
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java 
---
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * 
+ * An abstract implementation of inner join operator. Operator receives 
tuples from two streams,
+ * applies the join operation based on constraint and emit the joined 
value.
+ * Concrete classes should provide implementation to extractKey, 
extractTime, mergeTuples methods.
+ *
+ * Properties:
+ * includeFieldStr: List of comma separated fields to be added to 
the output tuple.
+ * Ex: Field1,Field2;Field3,Field4
+ * keyFields: List of comma separated key field for both the 
streams. Ex: Field1,Field2
+ * timeFields: List of comma separated time field for both the 
streams. Ex: Field1,Field2
+ * expiryTime: Expiry time for stored tuples
+ * isStream1KeyPrimary: : Specifies whether the stream1 key is 
primary or not
+ * isStream2KeyPrimary: : Specifies whether the stream2 key is 
primary or not
+ *
+ *  Example:  
+ *  Left input port receives customer details and right input port 
receives Order details.
+ *  Schema for the Customer be in the form of {ID, Name, CTime}
+ *  Schema for the Order be in the form of {OID, CID, OTime}
+ *  Now, Join the tuples of Customer and Order streams where Customer.ID = 
Order.CID and the constraint is
+ *  matched tuples must have timestamp within 5 minutes.
+ *  Here, key Fields = ID, CID and Time Fields = CTime, OTime, expiryTime 
= 5 minutes  
+ *
+ *  @displayName Abstract Inner Join Operator
+ *  @tags join
+ */
+public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator
+{
+  protected transient String[][] includeFields;
+  protected transient List keyFields;
+  protected transient List timeFields;
+  @AutoMetric
+  private long tuplesJoinedPerSec;
+  private double windowTimeSec;
+  private int tuplesCount;
+  @NotNull
+  private String keyFieldsStr;
+  @NotNull
+  private String includeFieldStr;
+  private String timeFieldsStr;
+  private Long stream1ExpiryTime;
+  private Long stream2ExpiryTime;
+  private boolean isStream1KeyPrimary = true;
+  private boolean isStream2KeyPrimary = true;
+  protected SpillableComplexComponent component;
+  protected Spillable.SpillableByteArrayListMultimap<K,T> stream1Data;
+  protected Spillable.SpillableByteArrayListMultimap<K,T> stream2Data;
+
+  /**
+   * Process the tuple which are received from input ports with the 
following steps:
+   * 1) Extract key from the given tuple
+   * 2) Insert <key,tuple> into the store where store is the stream1Data 
if the tuple
+   * receives from stream1 or viceversa.
+   * 3) Get the values of the key if found it in opposite store
+   * 4) Merge the given tuple and values found from step (3)
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to 
stream1 or not.
+   */
+  protected void processTuple(T tuple, boolean isStream1Data)
+  {
+Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? 
stream1Data : stream2Data;
+K key = extractKey

[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-02 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73141502
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java 
---
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * 
+ * An abstract implementation of inner join operator. Operator receives 
tuples from two streams,
+ * applies the join operation based on constraint and emit the joined 
value.
+ * Concrete classes should provide implementation to extractKey, 
extractTime, mergeTuples methods.
+ *
+ * Properties:
+ * includeFieldStr: List of comma separated fields to be added to 
the output tuple.
+ * Ex: Field1,Field2;Field3,Field4
+ * keyFields: List of comma separated key field for both the 
streams. Ex: Field1,Field2
+ * timeFields: List of comma separated time field for both the 
streams. Ex: Field1,Field2
--- End diff --

Yes. It can be. If the timeField of the stream is not in milliseconds then 
the user has to override the extractTime() method and convert the field into 
milliseconds.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-02 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73141529
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java 
---
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * 
+ * An abstract implementation of inner join operator. Operator receives 
tuples from two streams,
+ * applies the join operation based on constraint and emit the joined 
value.
+ * Concrete classes should provide implementation to extractKey, 
extractTime, mergeTuples methods.
+ *
+ * Properties:
+ * includeFieldStr: List of comma separated fields to be added to 
the output tuple.
+ * Ex: Field1,Field2;Field3,Field4
+ * keyFields: List of comma separated key field for both the 
streams. Ex: Field1,Field2
+ * timeFields: List of comma separated time field for both the 
streams. Ex: Field1,Field2
+ * expiryTime: Expiry time for stored tuples
+ * isStream1KeyPrimary: : Specifies whether the stream1 key is 
primary or not
+ * isStream2KeyPrimary: : Specifies whether the stream2 key is 
primary or not
+ *
+ *  Example:  
+ *  Left input port receives customer details and right input port 
receives Order details.
+ *  Schema for the Customer be in the form of {ID, Name, CTime}
+ *  Schema for the Order be in the form of {OID, CID, OTime}
+ *  Now, Join the tuples of Customer and Order streams where Customer.ID = 
Order.CID and the constraint is
+ *  matched tuples must have timestamp within 5 minutes.
+ *  Here, key Fields = ID, CID and Time Fields = CTime, OTime, expiryTime 
= 5 minutes  
+ *
+ *  @displayName Abstract Inner Join Operator
+ *  @tags join
+ */
+public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator
+{
+  protected transient String[][] includeFields;
+  protected transient List keyFields;
+  protected transient List timeFields;
+  @AutoMetric
+  private long tuplesJoinedPerSec;
+  private double windowTimeSec;
+  private int tuplesCount;
+  @NotNull
+  private String keyFieldsStr;
+  @NotNull
+  private String includeFieldStr;
+  private String timeFieldsStr;
+  private Long stream1ExpiryTime;
+  private Long stream2ExpiryTime;
+  private boolean isStream1KeyPrimary = true;
+  private boolean isStream2KeyPrimary = true;
+  protected SpillableComplexComponent component;
+  protected Spillable.SpillableByteArrayListMultimap<K,T> stream1Data;
+  protected Spillable.SpillableByteArrayListMultimap<K,T> stream2Data;
+
+  /**
+   * Process the tuple which are received from input ports with the 
following steps:
+   * 1) Extract key from the given tuple
+   * 2) Insert <key,tuple> into the store where store is the stream1Data 
if the tuple
+   * receives from stream1 or viceversa.
+   * 3) Get the values of the key if found it in opposite store
+   * 4) Merge the given tuple and values found from step (3)
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to 
stream1 or not.
+   */
+  protected void processTuple(T tuple, boolean isStream1Data)
+  {
+Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? 
stream1Data : stream2Data;
+K key = extractKey

[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-02 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73141506
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java 
---
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * 
+ * An abstract implementation of inner join operator. Operator receives 
tuples from two streams,
+ * applies the join operation based on constraint and emit the joined 
value.
+ * Concrete classes should provide implementation to extractKey, 
extractTime, mergeTuples methods.
+ *
+ * Properties:
+ * includeFieldStr: List of comma separated fields to be added to 
the output tuple.
+ * Ex: Field1,Field2;Field3,Field4
+ * keyFields: List of comma separated key field for both the 
streams. Ex: Field1,Field2
+ * timeFields: List of comma separated time field for both the 
streams. Ex: Field1,Field2
+ * expiryTime: Expiry time for stored tuples
--- End diff --

ms. Will add it into java docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-02 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73141485
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java 
---
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * 
+ * An abstract implementation of inner join operator. Operator receives 
tuples from two streams,
+ * applies the join operation based on constraint and emit the joined 
value.
+ * Concrete classes should provide implementation to extractKey, 
extractTime, mergeTuples methods.
+ *
+ * Properties:
+ * includeFieldStr: List of comma separated fields to be added to 
the output tuple.
+ * Ex: Field1,Field2;Field3,Field4
+ * keyFields: List of comma separated key field for both the 
streams. Ex: Field1,Field2
--- End diff --

Yes. If there are multiple key fields from each stream then the primary key 
field has to specify in keyFields and the other key fields has to take care in 
mergeTuples().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core issue #362: APEXCORE-494 Fixed the dynamic partition issue in case...

2016-07-31 Thread chaithu14
Github user chaithu14 commented on the issue:

https://github.com/apache/apex-core/pull/362
  
This fix is related to dynamic partitioning and logic related to 
re-partitioning is in redoPartitions(). 
Thanks Thomas, its a good suggestion. I will re-look in to it and will 
discuss with you before fixing it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #362: APEXCORE-494 Fixed the dynamic partition issue ...

2016-07-31 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-core/pull/362#discussion_r72926469
  
--- Diff: 
engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---
@@ -962,8 +963,11 @@ private void redoPartitions(PMapping currentMapping, 
String note)
   mainPC.operatorIdToPartition.put(p.getId(), newPartition);
 }
 
+// Get the deepest down stream mapping
+PMapping downStreamMapping = currentMapping;
--- End diff --

If partitionContexts linked hashmap exists then downStreamMapping be the 
last element of hasmap otherwise downStreamMapping be the currentMapping.
Lets say, the physical dag before repartition as below :
A1 -> B1 -> C1 -> D1 and parallel partition between A -> B and B -> C.

If A is repartitioned to 3 then 
downStreamMapping would be the Operator C's mapping.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core issue #362: APEXCORE-494 Fixed the dynamic partition issue in case...

2016-07-26 Thread chaithu14
Github user chaithu14 commented on the issue:

https://github.com/apache/apex-core/pull/362
  
@vrozov Please review and merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #362: APEXCORE-494 Fixed the dynamic partition issue ...

2016-07-26 Thread chaithu14
GitHub user chaithu14 opened a pull request:

https://github.com/apache/apex-core/pull/362

APEXCORE-494 Fixed the dynamic partition issue in case of initial partition 
size is 1



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-core 
APEXCORE-494-DPartIssue

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/362.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #362


commit 3372ad70b63c7086483e6815315089e98703f989
Author: Chaitanya <chaita...@datatorrent.com>
Date:   2016-07-26T12:09:52Z

APEXCORE-494 Fixed the dynamic partition issue in case of initial partition 
size is 1




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core issue #359: APEXCORE-494 Fixed the dynamic partition issue in case...

2016-07-22 Thread chaithu14
Github user chaithu14 commented on the issue:

https://github.com/apache/apex-core/pull/359
  
Thanks @vrozov @amberarrow . I will into the JIRA and will update this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core issue #359: APEXCORE-494 Fixed the dynamic partition issue in case...

2016-07-21 Thread chaithu14
Github user chaithu14 commented on the issue:

https://github.com/apache/apex-core/pull/359
  
@vrozov Can the definePartitions() return the current partition ? If yes, 
then there might be an issue in redoPartition().

From the above example, for the logical operator C, current partition size 
is 2 and new partition size is 3 which consists of current partitions {C1, C2}. 
Output port of C1 (or) C2 maps to input port of Unifier.

If the physical dag as below:
A1->B1->C1->Aggregator.
Current partition size = 1

A is repartitioned to 2 then physical dag as below:
A1->B1->C1-> 
 U -> Aggregator
A2->B2->C2->
new partition size = 2

Here, in the initial launch, Output of C1 maps to input of Aggregator and 
in after repartition, output of C1 maps to unifier. The mapping of current 
partition (A1) has changed in after repartition.  I think, this case is not 
covered in redopartitions().
Please correct it, if i am wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #359: APEXCORE-494 Fixed the dynamic partition issue ...

2016-07-21 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-core/pull/359#discussion_r71739128
  
--- Diff: 
engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---
@@ -895,8 +895,10 @@ private void redoPartitions(PMapping currentMapping, 
String note)
 addedPartitions.add(newPartition);
   } else {
 // check whether mapping was changed
+int currentPartitionsSize = mainPC.currentPartitions.size();
 for (DefaultPartition pi : mainPC.currentPartitions) {
-  if (pi == newPartition && pi.isModified()) {
+  if (pi == newPartition && (pi.isModified() ||
--- End diff --

@vrozov The problem occurs when definPartitions() returns the list of 
partitions (size > 1)  which consists of current partition of size 1.  Unifier 
will come into the picture in newPartitions  which is not be there in initial 
launch. Here, port mapping of current partition which is in 
mainPC.newPartitions has to be changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #359: APEXCORE-494 Fixed the dynamic partition issue ...

2016-07-19 Thread chaithu14
GitHub user chaithu14 opened a pull request:

https://github.com/apache/apex-core/pull/359

APEXCORE-494 Fixed the dynamic partition issue in case of initial partition 
size is 1



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-core 
APEXCORE-494-DPartIssue

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/359.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #359


commit c4578818fb970580310cff1d3a24d6884645c233
Author: Chaitanya <chaita...@datatorrent.com>
Date:   2016-07-19T11:42:48Z

APEXCORE-494 Fixed the dynamic partition issue in case of initial partition 
size is 1




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #346: APEXMALHAR-2158 Fixed the duplication of mess...

2016-07-18 Thread chaithu14
GitHub user chaithu14 opened a pull request:

https://github.com/apache/apex-malhar/pull/346

APEXMALHAR-2158 Fixed the duplication of messages emitted issue when the 
Kafka Input operator redeployed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2158-Dpdata-recovery

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/346.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #346


commit 96e0535e9c934e18ae10d71b9ee9fe9c5b9d6805
Author: Chaitanya <chaita...@datatorrent.com>
Date:   2016-07-18T11:27:04Z

APEXMALHAR-2158 Fixed the duplication of messages emitted issue when the 
Kafka Input operator redeployed




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: Initial cut of Inner Join operator for REVIEW...

2016-07-14 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r70789750
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java
 ---
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.join.managed.ManagedSpillableComplexComponent;
+import com.datatorrent.lib.join.managed.ManagedTimeStateMultiMap;
+import com.datatorrent.netlet.util.Slice;
+
+public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends 
AbstractInnerJoinOperator<K,T> implements
+Operator.CheckpointNotificationListener, 
Operator.CheckpointListener,Operator.IdleTimeHandler
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class);
+  public static final String stateDir = "managedState";
+  public static final String stream1State = "stream1Data";
+  public static final String stream2State = "stream2Data";
+  private transient long sleepMillis;
+  private transient Map<JoinEvent<K,T>, Future> waitingEvents = 
Maps.newLinkedHashMap();
+  private int noOfBuckets = 1;
+  private Long bucketSpanTime;
+  protected ManagedTimeStateImpl stream1Store;
+  protected ManagedTimeStateImpl stream2Store;
+
+  @Override
+  public void createStores()
+  {
+stream1Store = new ManagedTimeStateImpl();
+stream2Store = new ManagedTimeStateImpl();
+stream1Store.setNumBuckets(noOfBuckets);
+stream2Store.setNumBuckets(noOfBuckets);
+if (bucketSpanTime != null) {
+  
stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
+  
stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
+}
+
+if (getExpiryTime() != null) {
+  
stream1Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime()));
+  
stream2Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime()));
+}
+
+component = new ManagedSpillableComplexComponent();
+stream1Data = 
((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream1Store,
 isStream1KeyPrimary());
+stream2Data = 
((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream2Store,
 isStream2KeyPrimary());
+  }
+
+  @Override
+  protected void processTuple(T tuple, boolean isStream1Data)
+  {
+Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? 
stream1Data : stream2Data;
+K key = extractKey(tuple,isStream1Data);
+long timeBucket = extractTime(tuple,isStream1Data);
+((ManagedTimeStateMultiMap)store).put(key, tuple,timeBucket);
+joinStream(key,tuple,isStream1Data);
+  }
+
+  @Override
+  protected void joinStream(K key, T tuple, boolean isStream1Data)
+  {
+Spillable.SpillableByteArrayListMultimap<K, T> store = isStream1Data ? 
stream2Data : stream1Data;
+Future future = ((ManagedTimeStateMultiMap)store).getAsync(ke

[GitHub] apex-malhar pull request #334: APEXMALHAR-2136 1) Fixed the null pointer exc...

2016-07-07 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/334#discussion_r69947853
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
 ---
@@ -536,6 +538,16 @@ public void setDurationPreventingFreeingSpace(Duration 
durationPreventingFreeing
 this.durationPreventingFreeingSpace = durationPreventingFreeingSpace;
   }
 
+  public IncrementalCheckpointManager getCheckpointManager()
+  {
+return checkpointManager;
+  }
+
+  public void setCheckpointManager(IncrementalCheckpointManager 
checkpointManager)
--- End diff --

Updated as per the comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #334: APEXMALHAR-2136 1) Fixed the null pointer exc...

2016-07-07 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/334#discussion_r69860038
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
 ---
@@ -203,7 +203,12 @@ public void setup(OperatorContext context)
   //delete all the wal files with windows > activationWindow.
   //All the wal files with windows <= activationWindow are loaded and 
kept separately as recovered data.
   try {
-for (long recoveredWindow : 
checkpointManager.getWindowIds(operatorContext.getId())) {
+long[] recoveredWindows = 
checkpointManager.getWindowIds(operatorContext.getId());
+if (recoveredWindows == null) {
+  readerService = Executors.newFixedThreadPool(numReaders, new 
NameableThreadFactory("managedStateReaders"));
+  return;
+}
+for (long recoveredWindow : recoveredWindows) {
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #334: APEXMALHAR-2136 1) Fixed the null pointer exc...

2016-07-06 Thread chaithu14
GitHub user chaithu14 opened a pull request:

https://github.com/apache/apex-malhar/pull/334

APEXMALHAR-2136 1) Fixed the null pointer exception issue. 2) Added getter 
and setter for IncrementalCheckpointManager



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2136-NPE-RecoverWindows

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/334.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #334


commit 15b4858b72955ff2ce83e1817ac3f64c15347106
Author: Chaitanya <chaita...@datatorrent.com>
Date:   2016-07-07T05:42:45Z

APEXMALHAR-2136 1) Fixed the null pointer exception issue. 2) Added getter 
and setter for IncrementalCheckpointManager




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: Initial cut of Inner Join operator for REVIEW...

2016-07-05 Thread chaithu14
GitHub user chaithu14 opened a pull request:

https://github.com/apache/apex-malhar/pull/330

Initial cut of Inner Join operator for REVIEW only



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2100-InnerJoin

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/330.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #330


commit 6987dce47a5789e8de578ae3ee77c2cf6507fcc0
Author: Chaitanya <chaita...@datatorrent.com>
Date:   2016-07-05T12:14:46Z

Initial cut of Inner Join operator for REVIEW only




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-06-29 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r68926798
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
 ---
@@ -0,0 +1,241 @@
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Created by tfarkas on 6/12/16.
+ */
+public class SpillableByteArrayListMultimapImpl<K, V> implements 
Spillable.SpillableByteArrayListMultimap<K, V>,
+Spillable.SpillableComponent
+{
+  public static final int DEFAULT_BATCH_SIZE = 1000;
+  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, 
(byte)0, (byte)0};
+
+  private int batchSize = DEFAULT_BATCH_SIZE;
+
+  private WindowBoundedMapCache<K, SpillableArrayListImpl> cache = new 
WindowBoundedMapCache<>();
+
+  @NotNull
+  private SpillableByteMapImpl<byte[], Integer> map;
+
+  private SpillableStateStore store;
+  private byte[] identifier;
+  private long bucket;
+  private Serde<K, Slice> serdeKey;
+  private Serde<V, Slice> serdeValue;
+
+  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, 
byte[] identifier, long bucket,
+  Serde<K, Slice> serdeKey,
+  Serde<V, Slice> serdeValue)
+  {
+this.store = Preconditions.checkNotNull(store);
+this.identifier = Preconditions.checkNotNull(identifier);
+this.bucket = bucket;
+this.serdeKey = Preconditions.checkNotNull(serdeKey);
+this.serdeValue = Preconditions.checkNotNull(serdeValue);
+
+map = new SpillableByteMapImpl(store, identifier, bucket, new 
PassThruByteArraySliceSerde(), new SerdeIntSlice());
+  }
+
+  @Override
+  public List get(@Nullable K key)
+  {
+return getHelper(key);
+  }
+
--- End diff --

I think, we need to provide the asynchronous get method also.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-06-29 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r68926769
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
 ---
@@ -0,0 +1,241 @@
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Created by tfarkas on 6/12/16.
+ */
+public class SpillableByteArrayListMultimapImpl<K, V> implements 
Spillable.SpillableByteArrayListMultimap<K, V>,
+Spillable.SpillableComponent
+{
+  public static final int DEFAULT_BATCH_SIZE = 1000;
+  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, 
(byte)0, (byte)0};
+
+  private int batchSize = DEFAULT_BATCH_SIZE;
+
+  private WindowBoundedMapCache<K, SpillableArrayListImpl> cache = new 
WindowBoundedMapCache<>();
+
+  @NotNull
+  private SpillableByteMapImpl<byte[], Integer> map;
+
+  private SpillableStateStore store;
+  private byte[] identifier;
+  private long bucket;
+  private Serde<K, Slice> serdeKey;
--- End diff --

Why do we need to configure the bucket?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-06-27 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r68560022
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
 ---
@@ -0,0 +1,241 @@
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Created by tfarkas on 6/12/16.
+ */
+public class SpillableByteArrayListMultimapImpl<K, V> implements 
Spillable.SpillableByteArrayListMultimap<K, V>,
+Spillable.SpillableComponent
+{
+  public static final int DEFAULT_BATCH_SIZE = 1000;
+  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, 
(byte)0, (byte)0};
+
+  private int batchSize = DEFAULT_BATCH_SIZE;
+
+  private WindowBoundedMapCache<K, SpillableArrayListImpl> cache = new 
WindowBoundedMapCache<>();
+
+  @NotNull
+  private SpillableByteMapImpl<byte[], Integer> map;
+
+  private SpillableStateStore store;
+  private byte[] identifier;
+  private long bucket;
+  private Serde<K, Slice> serdeKey;
+  private Serde<V, Slice> serdeValue;
+
+  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, 
byte[] identifier, long bucket,
+  Serde<K, Slice> serdeKey,
+  Serde<V, Slice> serdeValue)
+  {
+this.store = Preconditions.checkNotNull(store);
+this.identifier = Preconditions.checkNotNull(identifier);
+this.bucket = bucket;
+this.serdeKey = Preconditions.checkNotNull(serdeKey);
+this.serdeValue = Preconditions.checkNotNull(serdeValue);
+
+map = new SpillableByteMapImpl(store, identifier, bucket, new 
PassThruByteArraySliceSerde(), new SerdeIntSlice());
+  }
+
+  @Override
+  public List get(@Nullable K key)
+  {
+return getHelper(key);
+  }
+
+  private SpillableArrayListImpl getHelper(@Nullable K key)
+  {
+SpillableArrayListImpl spillableArrayList = cache.get(key);
+
+if (spillableArrayList == null) {
+  Slice keyPrefix = serdeKey.serialize(key);
+  Integer size = map.get(SliceUtils.concatenate(keyPrefix, 
SIZE_KEY_SUFFIX));
+
--- End diff --

Accessing the value from map should be as follows: 
map.get(SliceUtils.concatenate().buffer)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-06-27 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r68559313
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java ---
@@ -0,0 +1,52 @@
+package org.apache.apex.malhar.lib.utils.serde;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class SliceUtils
+{
+  private SliceUtils()
+  {
+  }
+
+  public static byte[] concatenate(byte[] a, byte[] b)
+  {
+byte[] output = new byte[a.length + b.length];
+
+System.arraycopy(a, 0, output, 0, a.length);
+System.arraycopy(b, 0, output, a.length, b.length);
+return output;
+  }
+
+  public static Slice concatenate(Slice a, Slice b)
+  {
+int size = a.length + b.length;
+byte[] bytes = new byte[size];
+
+System.arraycopy(a.buffer, a.offset, bytes, 0, a.length);
+System.arraycopy(b.buffer, b.offset, bytes, a.length, b.length);
+
+return new Slice(bytes);
+  }
+
+  public static Slice concatenate(byte[] a, Slice b)
+  {
+int size = a.length + b.length;
+byte[] bytes = new byte[size];
+
+System.arraycopy(a, 0, bytes, 0, a.length);
+System.arraycopy(b.buffer, b.offset, bytes, a.length, b.length);
+
+return new Slice(bytes);
+  }
+
+  public static Slice concatenate(Slice a, byte[] b)
+  {
+int size = a.length + b.length;
+byte[] bytes = new byte[size];
+
+System.arraycopy(a, a.offset, bytes, 0, a.length);
--- End diff --

a.buffer instead of a.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-06-27 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r68559266
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java
 ---
@@ -0,0 +1,191 @@
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+public class SpillableByteMapImpl<K, V> implements 
Spillable.SpillableByteMap<K, V>, Spillable.SpillableComponent
+{
+  @NotNull
+  private SpillableStateStore store;
+  @NotNull
+  private byte[] identifier;
+  private long bucket;
+  @NotNull
+  private Serde<K, Slice> serdeKey;
+  @NotNull
+  private Serde<V, Slice> serdeValue;
+
+  private int size = 0;
+
+  private transient WindowBoundedMapCache<K, V> cache = new 
WindowBoundedMapCache<>();
+  private transient MutableInt tempOffset = new MutableInt();
+
+  private SpillableByteMapImpl()
+  {
+//for kryo
+  }
+
+  public SpillableByteMapImpl(SpillableStateStore store, byte[] 
identifier, long bucket, Serde<K, Slice> serdeKey,
+  Serde<V, Slice> serdeValue)
+  {
+this.store = Preconditions.checkNotNull(store);
+this.identifier = Preconditions.checkNotNull(identifier);
+this.bucket = bucket;
+this.serdeKey = Preconditions.checkNotNull(serdeKey);
+this.serdeValue = Preconditions.checkNotNull(serdeValue);
+  }
+
+  @Override
+  public int size()
+  {
+return size;
+  }
+
+  @Override
+  public boolean isEmpty()
+  {
+return size == 0;
+  }
+
+  @Override
+  public boolean containsKey(Object o)
+  {
+return get(o) != null;
+  }
+
+  @Override
+  public boolean containsValue(Object o)
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public V get(Object o)
+  {
+K key = (K)o;
+
+if (cache.getRemovedKeys().contains(key)) {
+  return null;
+}
+
+V val = cache.get(key);
+
+if (val != null) {
+  return val;
+}
+
+Slice valSlice = store.getSync(bucket, 
SliceUtils.concatenate(identifier, serdeKey.serialize(key)));
+
+if (valSlice == null || valSlice == BucketedState.EXPIRED || 
valSlice.length == 0) {
+  return null;
+}
+
+tempOffset.setValue(valSlice.offset + identifier.length);
+return serdeValue.deserialize(valSlice, tempOffset);
+  }
+
+  @Override
+  public V put(K k, V v)
+  {
+V value = get(k);
+
+if (value == null) {
+  size++;
+}
+
+cache.put(k, v);
+
+return value;
+  }
+
+  @Override
+  public V remove(Object o)
+  {
+V value = get(o);
+
+if (value != null) {
+  size--;
+}
+
+cache.remove((K)o);
+
+return value;
+  }
+
+  @Override
+  public void putAll(Map map)
+  {
+for (Map.Entry entry : map.entrySet()) {
+  put(entry.getKey(), entry.getValue());
+}
+  }
+
+  @Override
+  public void clear()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Set keySet()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection values()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Set<Entry<K, V>> entrySet()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+  }
--- End diff --

I think store.setup() has to be called in setup method because 
SpillableStateStore extends from Component interface. Similarly the other 
methods like beginWindow, endWindow, teardown.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-06-27 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r68559252
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
 ---
@@ -0,0 +1,241 @@
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Created by tfarkas on 6/12/16.
+ */
+public class SpillableByteArrayListMultimapImpl<K, V> implements 
Spillable.SpillableByteArrayListMultimap<K, V>,
+Spillable.SpillableComponent
+{
+  public static final int DEFAULT_BATCH_SIZE = 1000;
+  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, 
(byte)0, (byte)0};
+
+  private int batchSize = DEFAULT_BATCH_SIZE;
+
+  private WindowBoundedMapCache<K, SpillableArrayListImpl> cache = new 
WindowBoundedMapCache<>();
+
+  @NotNull
+  private SpillableByteMapImpl<byte[], Integer> map;
+
+  private SpillableStateStore store;
+  private byte[] identifier;
+  private long bucket;
+  private Serde<K, Slice> serdeKey;
+  private Serde<V, Slice> serdeValue;
+
+  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, 
byte[] identifier, long bucket,
+  Serde<K, Slice> serdeKey,
+  Serde<V, Slice> serdeValue)
+  {
+this.store = Preconditions.checkNotNull(store);
+this.identifier = Preconditions.checkNotNull(identifier);
+this.bucket = bucket;
+this.serdeKey = Preconditions.checkNotNull(serdeKey);
+this.serdeValue = Preconditions.checkNotNull(serdeValue);
+
+map = new SpillableByteMapImpl(store, identifier, bucket, new 
PassThruByteArraySliceSerde(), new SerdeIntSlice());
+  }
+
+  @Override
+  public List get(@Nullable K key)
+  {
+return getHelper(key);
+  }
+
+  private SpillableArrayListImpl getHelper(@Nullable K key)
+  {
+SpillableArrayListImpl spillableArrayList = cache.get(key);
+
+if (spillableArrayList == null) {
+  Slice keyPrefix = serdeKey.serialize(key);
+  Integer size = map.get(SliceUtils.concatenate(keyPrefix, 
SIZE_KEY_SUFFIX));
+
+  if (size == null) {
+return null;
+  }
+
+  spillableArrayList = new SpillableArrayListImpl(bucket, 
keyPrefix.buffer, store, serdeValue);
+  spillableArrayList.setSize(size);
+
+  cache.put(key, spillableArrayList);
+}
+
+return spillableArrayList;
+  }
+
+  @Override
+  public Set keySet()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Multiset keys()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection values()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection<Map.Entry<K, V>> entries()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List removeAll(@Nullable Object key)
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void clear()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int size()
+  {
+return map.size();
+  }
+
+  @Override
+  public boolean isEmpty()
+  {
+return map.isEmpty();
+  }
+
+  @Override
+  public boolean containsKey(@Nullable Object key)
+  {
+return 
map.containsKey(SliceUtils.concatenate(serdeKey.serialize((K)key), 
SIZE_KEY_SUFFIX));
+  }
+
+  @Override
+  public boolean containsValue(@Nullable Object value)
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean containsEntry(@Nullable Object key, @Nullable Object 
value)
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean put(@Nullable K key, @Nullable V value)
+  {

[GitHub] incubator-apex-malhar pull request #300: APEXMALHAR-2103 Fixed the scanner i...

2016-06-01 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/300#discussion_r65475558
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java ---
@@ -375,11 +375,14 @@ public void run()
 lastScannedInfo = null;
 numDiscoveredPerIteration = 0;
 for (String afile : files) {
-  String filePath = new File(afile).getAbsolutePath();
+  Path filePath = new Path(afile);
   LOG.debug("Scan started for input {}", filePath);
-  Map<String, Long> lastModifiedTimesForInputDir;
-  lastModifiedTimesForInputDir = referenceTimes.get(filePath);
-  scan(new Path(afile), null, lastModifiedTimesForInputDir);
+  Map<String, Long> lastModifiedTimesForInputDir = null;
+  if (fs.exists(filePath)) {
+FileStatus fileStatus = fs.getFileStatus(filePath);
+lastModifiedTimesForInputDir = 
referenceTimes.get(fileStatus.getPath().toUri().getPath());
--- End diff --

No. In createScannedFileInfo(), the directory/file path specified as 
absolute path. But, In case of local file system, files might be relative path. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2103 Fixed the scanner issue ...

2016-05-31 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/300#discussion_r65303185
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java ---
@@ -375,11 +374,18 @@ public void run()
 lastScannedInfo = null;
 numDiscoveredPerIteration = 0;
 for (String afile : files) {
-  String filePath = new File(afile).getAbsolutePath();
-  LOG.debug("Scan started for input {}", filePath);
-  Map<String, Long> lastModifiedTimesForInputDir;
-  lastModifiedTimesForInputDir = referenceTimes.get(filePath);
-  scan(new Path(afile), null, lastModifiedTimesForInputDir);
+  Path filePath = new Path(afile);
+  LOG.debug("Scan started for input {}", filePath.toString());
+  Map<String, Long> lastModifiedTimesForInputDir = null;
+  if (fs.exists(filePath)) {
+FileStatus fileStatus = fs.getFileStatus(filePath);
+if (fileStatus.isDirectory()) {
+  lastModifiedTimesForInputDir = 
referenceTimes.get(fileStatus.getPath().toString());
+} else {
+  lastModifiedTimesForInputDir = 
referenceTimes.get(fileStatus.getPath().getParent().toString());
--- End diff --

@Priyanka: If we maintain 2 different keys, then fileSplitter will emit 
multiple filemetadata's for /home/myDir/file1.txt. Is it expected behavior. 
Please correct it, if I am wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2103 Fixed the scan...

2016-05-29 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/300#discussion_r65023670
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java ---
@@ -375,11 +374,11 @@ public void run()
 lastScannedInfo = null;
 numDiscoveredPerIteration = 0;
 for (String afile : files) {
-  String filePath = new File(afile).getAbsolutePath();
-  LOG.debug("Scan started for input {}", filePath);
+  Path filePath = new Path(afile);
+  LOG.debug("Scan started for input {}", filePath.toString());
--- End diff --

Yeah. afile and filePath.toString() represents the same value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2103 Fixed the scan...

2016-05-29 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:


https://github.com/apache/incubator-apex-malhar/pull/300#discussion_r65023539
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java ---
@@ -375,11 +374,11 @@ public void run()
 lastScannedInfo = null;
 numDiscoveredPerIteration = 0;
 for (String afile : files) {
-  String filePath = new File(afile).getAbsolutePath();
-  LOG.debug("Scan started for input {}", filePath);
+  Path filePath = new Path(afile);
+  LOG.debug("Scan started for input {}", filePath.toString());
   Map<String, Long> lastModifiedTimesForInputDir;
-  lastModifiedTimesForInputDir = referenceTimes.get(filePath);
-  scan(new Path(afile), null, lastModifiedTimesForInputDir);
+  lastModifiedTimesForInputDir = 
referenceTimes.get(fs.getFileStatus(filePath).getPath().toString());
--- End diff --

No. In case of LocalFileSystem, if the filePath consists of relative path 
then again the keys are different. 
Setting the directory path in createScannedFileInfo() and accessing the 
value from referenceTimes must be sync.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2103 Fixed the scan...

2016-05-27 Thread chaithu14
GitHub user chaithu14 opened a pull request:

https://github.com/apache/incubator-apex-malhar/pull/300

APEXMALHAR-2103 Fixed the scanner issue in FileSplitterInput Class



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2103-ScannerIssue

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-apex-malhar/pull/300.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #300


commit 23e91ea0308721585bbdab007bad7adea4592796
Author: Chaitanya <chaita...@datatorrent.com>
Date:   2016-05-27T09:53:05Z

APEXMALHAR-2103 Fixed the scanner issue in FileSplitterInput Class




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2103 Fixed the scan...

2016-05-27 Thread chaithu14
Github user chaithu14 closed the pull request at:

https://github.com/apache/incubator-apex-malhar/pull/299


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2103 Fixed the scan...

2016-05-27 Thread chaithu14
GitHub user chaithu14 opened a pull request:

https://github.com/apache/incubator-apex-malhar/pull/299

APEXMALHAR-2103 Fixed the scanner issue in FileSplitterInput Class



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2103-ScannerIssue

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-apex-malhar/pull/299.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #299


commit 94d01fd5a264684f6b372600a92c1c7d20b12537
Author: Chaitanya <chaita...@datatorrent.com>
Date:   2016-05-27T06:40:48Z

APEXMALHAR-2103 Fixed the scanner issue in FileSplitterInput Class




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---