[GitHub] apex-malhar pull request #642: APEXMALHAR-2497 APEXMALHAR-2162 1) Refactor t...
GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-malhar/pull/642 APEXMALHAR-2497 APEXMALHAR-2162 1) Refactor the Exactly Once output operator. 2) Refactor and fix the issues of unit tests. @tushargosavi @tweise Please review and merge. You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2497-Kafka10Exactly Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/642.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #642 commit 54565e070c0754c6a86ff1091f33765e40aab9af Author: chaitanya <chai...@apache.org> Date: 2017-07-10T06:50:12Z APEXMALHAR-2497 APEXMALHAR-2162 1) Refactor the Exactly Once output operator. 2) Refactor and fix the issues of unit tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #623: APEXMALHAR-2494 Added description for the dem...
GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-malhar/pull/623 APEXMALHAR-2494 Added description for the demo apps @yogidevendra Please review and merge. You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2494-UpdateDesc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/623.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #623 commit 239361cd7abe585c02b4ef781eaa8320733bc445 Author: chaitanya <chai...@apache.org> Date: 2017-05-19T08:38:52Z APEXMALHAR-2494 Added description for the demo apps --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #622: APEXMALHAR-2493 Fixed the issue of KafkaSingl...
GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-malhar/pull/622 APEXMALHAR-2493 Fixed the issue of KafkaSinglePortExactlyOnceOutputOperator going to the blocked state during recovery @sandeshh @tushargosavi Please review and merge. You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2493-KafkaExactlyCBBug Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/622.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #622 commit c784f4da46d1cf594aa4156135b9c196aa66d931 Author: chaitanya <chai...@apache.org> Date: 2017-05-18T10:37:52Z APEXMALHAR-2493 Fixed the issue of KafkaSinglePortExactlyOnceOutputOperator going to the blocked state during recovery --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #614: APEXMALHAR-2484 Support of PartFileWriter for...
Github user chaithu14 closed the pull request at: https://github.com/apache/apex-malhar/pull/614 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #614: APEXMALHAR-2484 Support of PartFileWriter for...
GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-malhar/pull/614 APEXMALHAR-2484 Support of PartFileWriter for writing the part files You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2484-partfilewriter Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/614.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #614 commit d45ea93d8db24af6181d55a5b4e2a13148e4629b Author: chaitanya <chai...@apache.org> Date: 2017-04-27T10:00:09Z APEXMALHAR-2484 Support of PartFileWriter for writing the part files --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #611: APEXMALHAR-2484 Support of PartFileWriter for...
Github user chaithu14 closed the pull request at: https://github.com/apache/apex-malhar/pull/611 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #611: APEXMALHAR-2484 Support of PartFileWriter for...
GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-malhar/pull/611 APEXMALHAR-2484 Support of PartFileWriter for writing the part files @shubham-pathak22 @yogidevendra Please review and merge. You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2484-partfilewriter Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/611.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #611 commit 6761f7d1efc9ab34076c625cdc0684c8de200151 Author: chaitanya <chai...@apache.org> Date: 2017-04-26T09:40:52Z APEXMALHAR-2484 PartFileWriter for writing the part files in specified directory --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #600: APEXMALHAR-2459 1)Refactor the existing Kafka...
GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-malhar/pull/600 APEXMALHAR-2459 1)Refactor the existing Kafka Input Operator. 2)Added the support of KafkaInputOperator using 0.10 consumer API You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2459-Kafkainput Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/600.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #600 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #529: APEXMALHAR-2364 Added Documetation for S3Outp...
GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-malhar/pull/529 APEXMALHAR-2364 Added Documetation for S3Output Module @amberarrow Please review and merge. You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2364-S3OutputDoc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/529.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #529 commit a8c8cc293bb67dc80a49f29db271e485cb98567c Author: chaitanya <chai...@apache.org> Date: 2016-12-27T10:01:22Z APEXMALHAR-2364 Added Documetation for S3Output Module --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #483: APEXMALHAR-2022 Developed S3 Output Module
GitHub user chaithu14 reopened a pull request: https://github.com/apache/apex-malhar/pull/483 APEXMALHAR-2022 Developed S3 Output Module You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2022-S3Output-multiPart Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/483.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #483 commit a5e8fa3facca750f5d7402c2c29e7cbabe53bd9e Author: chaitanya <chai...@apache.org> Date: 2016-11-30T05:17:36Z APEXMALHAR-2022 Development of S3 Output Module --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #483: APEXMALHAR-2022 Developed S3 Output Module
Github user chaithu14 closed the pull request at: https://github.com/apache/apex-malhar/pull/483 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #483: APEXMALHAR-2022 Developed S3 Output Module
GitHub user chaithu14 reopened a pull request: https://github.com/apache/apex-malhar/pull/483 APEXMALHAR-2022 Developed S3 Output Module You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2022-S3Output-multiPart Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/483.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #483 commit a5e8fa3facca750f5d7402c2c29e7cbabe53bd9e Author: chaitanya <chai...@apache.org> Date: 2016-11-30T05:17:36Z APEXMALHAR-2022 Development of S3 Output Module --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #483: APEXMALHAR-2022 Developed S3 Output Module
Github user chaithu14 closed the pull request at: https://github.com/apache/apex-malhar/pull/483 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #483: APEXMALHAR-2022 Developed S3 Output Module
GitHub user chaithu14 reopened a pull request: https://github.com/apache/apex-malhar/pull/483 APEXMALHAR-2022 Developed S3 Output Module You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2022-S3Output-multiPart Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/483.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #483 commit 6ab63bd92dc93ac4ddb3d6ce70d310cfa9322f82 Author: chaitanya <chai...@apache.org> Date: 2016-11-28T08:54:54Z APEXMALHAR-2022 Development of S3 Output ModuleAPEXMALHAR-2022 Development of S3 Output Module --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #483: APEXMALHAR-2022 Developed S3 Output Module
Github user chaithu14 closed the pull request at: https://github.com/apache/apex-malhar/pull/483 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #485: APEXMALHAR-2284 *Review Only*
GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-malhar/pull/485 APEXMALHAR-2284 *Review Only* 1) Implemented SpillableMap, SpillableArrayList, SpillableArrayListMultiMap over TimeSlicedBucketState 2) Integrated Spillable data structure into Inner Join operator You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2284-SPDOverTime Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/485.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #485 commit 991e5660f52c266e75440fc0b7c03aa3eb4c05fe Author: chaitanya <chai...@apache.org> Date: 2016-11-08T12:57:16Z APEXMALHAR-2284 *Review Only* 1) Implemented SpillableMap, SpillableArrayList, SpillableArrayListMultiMap over TimeSlicedBucketState 2) Integrated Spillable data structure into Inner Join opeator --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #483: APEXMALHAR-2022 Developed S3 Output Module
GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-malhar/pull/483 APEXMALHAR-2022 Developed S3 Output Module You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2022-S3Output-multiPart Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/483.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #483 commit 24fb5638ecb6f0e45edb5d5f640b220ad9372fcc Author: chaitanya <chai...@apache.org> Date: 2016-11-04T12:48:21Z APEXMALHAR-2022 Developed S3 Output Module --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #479: APEXMALHAR-2325 1) Set the file system defaul...
GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-malhar/pull/479 APEXMALHAR-2325 1) Set the file system default block size to the reader. 2) Set the block size to the reader context You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2325-SameBlockID Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/479.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #479 commit f06e3d8b79edbcdcd6c084e989a6bcbc87e210f9 Author: chaitanya <chai...@apache.org> Date: 2016-11-02T12:27:43Z APEXMALHAR-2325 1) Set the file system default block size to the reader. 2) Set the block size to the reader context --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #467: APEXMALHAR-2257 Added documentation of Transf...
GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-malhar/pull/467 APEXMALHAR-2257 Added documentation of Transform Operator You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2257-transformDoc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/467.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #467 commit 04883eeaff504f62af64dd8419596a72ef636be0 Author: chaitanya <chai...@apache.org> Date: 2016-10-24T08:08:24Z APEXMALHAR-2257 Added documentation of Transform Operator --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #453: APEXMALHAR-2284 Disabled the POJOInnerJoinOpe...
GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-malhar/pull/453 APEXMALHAR-2284 Disabled the POJOInnerJoinOperatorTest You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2284 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/453.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #453 commit 83ba1a6b5bb3648bf801bb9995de62892e04e1ae Author: chaitanya <chai...@apache.org> Date: 2016-10-14T05:50:05Z APEXMALHAR-2284 Disabled the POJOInnerJoinOperatorTest --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #420: APEXMALHAR-2242 Added user documentation for ...
GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-malhar/pull/420 APEXMALHAR-2242 Added user documentation for 0.9 version of Kafka Input Operator You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2242-kafka0.9doc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/420.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #420 commit 263c92545d12f6c21625d76e4a18a2ebcae933a8 Author: chaitanya <chai...@apache.org> Date: 2016-09-21T06:44:50Z APEXMALHAR-2242 Added user documentation for 0.9 version of Kafka Input Operator --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #401: APEXMALHAR-2226 Fixed the Not supported excep...
GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-malhar/pull/401 APEXMALHAR-2226 Fixed the Not supported exception while re-deploying the AbstractFileOutput Operator You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2226-FS-Append Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/401.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #401 commit ce7e574b211b74eb60065a67f98ec5e99b09e022 Author: chaitanya <chai...@apache.org> Date: 2016-09-07T09:08:31Z APEXMALHAR-2226 Fixed the Not supported exception while re-deploying the AbstractFileOutput Operator --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #379: APEXMALHAR-2134 Fix the NullPointerException,...
Github user chaithu14 closed the pull request at: https://github.com/apache/apex-malhar/pull/379 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #379: APEXMALHAR-2134 Fix the NullPointerException,...
GitHub user chaithu14 reopened a pull request: https://github.com/apache/apex-malhar/pull/379 APEXMALHAR-2134 Fix the NullPointerException, if the kafka partition has no leader broker You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2134-NP-LB Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/379.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #379 commit 0da1765f4ff12fcda1461f3af06a57450ba8fb4c Author: chaitanya <chai...@apache.org> Date: 2016-08-25T19:21:43Z APEXMALHAR-2134 Fix the NullPointerException, if the kafka partition has no leader broker --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #380: APEXMALHAR-2199 Added the support chroot zook...
GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-malhar/pull/380 APEXMALHAR-2199 Added the support chroot zookeeper path in Kafka Input Operator You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2199-zkPath Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/380.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #380 commit 82a6acbbd4c46e3e940d25366a2ca84c7dea4b8e Author: chaitanya <chai...@apache.org> Date: 2016-08-25T18:58:31Z APEXMALHAR-2199 Added the support chroot zookeeper path in Kafka Input Operator --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #379: APEXMALHAR-2134 Fix the NullPointerException,...
GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-malhar/pull/379 APEXMALHAR-2134 Fix the NullPointerException, if the kafka partition has no leader broker You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2134-NP-LB Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/379.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #379 commit 8b662fb11f0b77373d302c58b4522011806bbfa9 Author: chaitanya <chai...@apache.org> Date: 2016-08-25T17:41:17Z APEXMALHAR-2134 Fix the NullPointerException, if the kafka partition has no leader broker --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #373: APEXMALHAR-2154 Update the Kafka Input Operat...
GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-malhar/pull/373 APEXMALHAR-2154 Update the Kafka Input Operator to use CheckpointNotificationListener You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2154 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/373.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #373 commit 9d15fe29287697ff2b660e637c495d76a01ab910 Author: chaitanya <chai...@apache.org> Date: 2016-08-18T07:23:40Z APEXMALHAR-2154 Update the Kafka Input Operator to use CheckpointNotificationListener --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
GitHub user chaithu14 reopened a pull request: https://github.com/apache/apex-malhar/pull/330 APEXMALHAR-2100 Implementation of Inner Join operator You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2100-InnerJoin Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/330.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #330 commit 755aadae8814887431cd66b55bd1e03591afb5c6 Author: Chaitanya <chaita...@datatorrent.com> Date: 2016-08-17T10:22:24Z APEXMALHAR-2100 Implementation of Inner Join operator --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
GitHub user chaithu14 reopened a pull request: https://github.com/apache/apex-malhar/pull/330 APEXMALHAR-2100 Implementation of Inner Join operator You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2100-InnerJoin Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/330.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #330 commit 363287a2cca2a0a5dbb9096b4d971719e71d5342 Author: Chaitanya <chaita...@datatorrent.com> Date: 2016-08-17T07:08:23Z APEXMALHAR-2100 Implementation of Inner Join operator --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 closed the pull request at: https://github.com/apache/apex-malhar/pull/330 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 closed the pull request at: https://github.com/apache/apex-malhar/pull/330 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
GitHub user chaithu14 reopened a pull request: https://github.com/apache/apex-malhar/pull/330 APEXMALHAR-2100 Implementation of Inner Join operator You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2100-InnerJoin Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/330.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #330 commit 22e165c2cc8710c4a5135d64cb6a684cf0866c07 Author: Chaitanya <chaita...@datatorrent.com> Date: 2016-08-10T14:01:28Z APEXMALHAR-2100 Implementation of Inner Join operator --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #360: APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 r...
Github user chaithu14 closed the pull request at: https://github.com/apache/apex-malhar/pull/360 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #360: APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 r...
GitHub user chaithu14 reopened a pull request: https://github.com/apache/apex-malhar/pull/360 APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 reader issue You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2174-S3-ReaderIssue Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/360.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #360 commit 0c70e92e6f2a1a631569d6b9608ac79de0a50b96 Author: Chaitanya <chaita...@datatorrent.com> Date: 2016-08-08T06:21:11Z APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 reader issue --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844379 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java --- @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.StreamCodec; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.netlet.util.Slice; + +/** + * Concrete implementation of SpillableByteArrayListMultimap which is needed for join operator. + * + * Properties: + * isKeyContainsMultiValue: Specifies whether the key has multiple value or not. + * timeBucket: Specifies the lenght of the time bucket. + * + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableByteArrayListMultimap<K,V> +{ + private transient StreamCodec streamCodec = null; + private boolean isKeyContainsMultiValue = false; + private long timeBucket; + @NotNull + private ManagedTimeStateImpl store; + + public ManagedTimeStateMultiValue() + { +if (streamCodec == null) { + streamCodec = new KryoSerializableStreamCodec(); +} + } + + public ManagedTimeStateMultiValue(@NotNull ManagedTimeStateImpl store, boolean isKeyContainsMultiValue) + { +this(); +this.store = Preconditions.checkNotNull(store); +this.isKeyContainsMultiValue = isKeyContainsMultiValue; + } + + /** + * Return the list of values from the store + * @param k given key + * @return list of values + */ + @Override + public List get(@Nullable K k) + { +List value = null; +Slice valueSlice = store.getSync(getBucketId(k), streamCodec.toByteArray(k)); +if (valueSlice == null || valueSlice.length == 0 || valueSlice.buffer == null) { + return null; +} +if (isKeyContainsMultiValue) { + return (List)streamCodec.fromByteArray(valueSlice); +} +value = new ArrayList<>(); +value.add((V)streamCodec.fromByteArray(valueSlice)); +return value; + } + + /** + * Returns the Future form the store. + * @param k given key + * @return + */ + public CompositeFuture getAsync(@Nullable K k) + { +return new CompositeFuture(store.getAsync(getBucketId(k), streamCodec.toByteArray(k))); + } + + @Override + public Set keySet() + { +throw new UnsupportedOperationException(); + } + + @Override + public Multiset keys() + { +throw new UnsupportedOperationException(); + } + + @Override + public Collection values() + { +throw new UnsupportedOperationException(); + } + + @Override + public Collection<Map.Entry<K, V>> entries() + { +throw new UnsupportedOperationException(); + } + + @Override + public List removeAll(@Nullable Object o) + { +throw new UnsupportedOperationException(); + } + + @Override + public void clear() + { + + } + + @Override
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844212 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java --- @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Arrays; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.common.util.BaseOperator; + +/** + * + * An abstract implementation of inner join operator. Operator receives tuples from two streams, + * applies the join operation based on constraint and emit the joined value. + * Concrete classes should provide implementation to extractKey, extractTime, mergeTuples methods. + * + * Properties: + * includeFieldStr: List of comma separated fields to be added to the output tuple. + * Ex: Field1,Field2;Field3,Field4 + * keyFields: List of comma separated key field for both the streams. Ex: Field1,Field2 + * timeFields: List of comma separated time field for both the streams. Ex: Field1,Field2 + * expiryTime: Expiry time for stored tuples + * isStream1KeyPrimary: : Specifies whether the stream1 key is primary or not + * isStream2KeyPrimary: : Specifies whether the stream2 key is primary or not + * + * Example: + * Left input port receives customer details and right input port receives Order details. + * Schema for the Customer be in the form of {ID, Name, CTime} + * Schema for the Order be in the form of {OID, CID, OTime} + * Now, Join the tuples of Customer and Order streams where Customer.ID = Order.CID and the constraint is + * matched tuples must have timestamp within 5 minutes. + * Here, key Fields = ID, CID and Time Fields = CTime, OTime, expiryTime = 5 minutes + * + * @displayName Abstract Inner Join Operator + * @tags join + */ +public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator +{ + protected transient String[][] includeFields; + protected transient List keyFields; + protected transient List timeFields; + @AutoMetric + private long tuplesJoinedPerSec; + private double windowTimeSec; + private int tuplesCount; + @NotNull + private String keyFieldsStr; + @NotNull + private String includeFieldStr; + private String timeFieldsStr; + private Long stream1ExpiryTime; + private Long stream2ExpiryTime; + private boolean isStream1KeyPrimary = true; + private boolean isStream2KeyPrimary = true; + protected SpillableComplexComponent component; + protected Spillable.SpillableByteArrayListMultimap<K,T> stream1Data; + protected Spillable.SpillableByteArrayListMultimap<K,T> stream2Data; + + /** + * Process the tuple which are received from input ports with the following steps: + * 1) Extract key from the given tuple + * 2) Insert <key,tuple> into the store where store is the stream1Data if the tuple + * receives from stream1 or viceversa. + * 3) Get the values of the key if found it in opposite store + * 4) Merge the given tuple and values found from step (3) + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + */ + protected void processTuple(T tuple, boolean isStream1Data) + { +Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data; +K key = extractKey
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844311 --- Diff: library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams. + * + * @displayName POJO Inner Join Operator + * @tags join + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator<Object,Object> implements Operator.ActivationListener +{ + private static final transient Logger LOG = LoggerFactory.getLogger(POJOInnerJoinOperator.class); + private transient long timeIncrement; + private transient FieldObjectMap[] inputFieldObjects = (FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2); + protected transient Class outputClass; + private long time = System.currentTimeMillis(); + + @OutputPortFieldAnnotation(schemaRequired = true) + public final transient DefaultOutputPort outputPort = new DefaultOutputPort() + { +@Override +public void setup(Context.PortContext context) +{ + outputClass = context.getValue(Context.PortContext.TUPLE_CLASS); +} + }; + + @InputPortFieldAnnotation(schemaRequired = true) + public transient DefaultInputPort input1 = new DefaultInputPort() + { +@Override +public void setup(Context.PortContext context) +{ + inputFieldObjects[0].inputClass = context.getValue(Context.PortContext.TUPLE_CLASS); +} + +@Override +public void process(Object tuple) +{ + processTuple(tuple,true); +} + +@Override +public StreamCodec getStreamCodec() +{ + return getInnerJoinStreamCodec(true); +} + }; + + @InputPortFieldAnnotation(schemaRequired = true) + public transient DefaultInputPort input2 = new DefaultInputPort() + { +@Override +public void setup(Context.PortContext context) +{ + inputFieldObjects[1].inputClass = context.getValue(Context.PortContext.TUPLE_CLASS); +} + +@Override +public void process(Object tuple) +{ + processTuple(tuple,false); +} + +@Override +public StreamCodec getStreamCodec() +{ + return getInnerJoinStreamCodec(false); +} + }; + + @Override + public void setup(Context.OperatorContext context) + { +timeIncrement = context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * + context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS); +super.setup(context); +for (int i = 0; i < 2; i++) { + inputFieldObjects[i] = new FieldObjectMap(); +} + } + + /** + * Extract the time value from the given tuple + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + * @return + */ + @Override + public long extractTime(Object tuple, boolean isStream1Data) + { +ret
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844328 --- Diff: library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams. + * + * @displayName POJO Inner Join Operator + * @tags join + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator<Object,Object> implements Operator.ActivationListener +{ + private static final transient Logger LOG = LoggerFactory.getLogger(POJOInnerJoinOperator.class); + private transient long timeIncrement; + private transient FieldObjectMap[] inputFieldObjects = (FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2); + protected transient Class outputClass; + private long time = System.currentTimeMillis(); + + @OutputPortFieldAnnotation(schemaRequired = true) + public final transient DefaultOutputPort outputPort = new DefaultOutputPort() + { +@Override +public void setup(Context.PortContext context) +{ + outputClass = context.getValue(Context.PortContext.TUPLE_CLASS); +} + }; + + @InputPortFieldAnnotation(schemaRequired = true) + public transient DefaultInputPort input1 = new DefaultInputPort() + { +@Override +public void setup(Context.PortContext context) +{ + inputFieldObjects[0].inputClass = context.getValue(Context.PortContext.TUPLE_CLASS); +} + +@Override +public void process(Object tuple) +{ + processTuple(tuple,true); +} + +@Override +public StreamCodec getStreamCodec() +{ + return getInnerJoinStreamCodec(true); +} + }; + + @InputPortFieldAnnotation(schemaRequired = true) + public transient DefaultInputPort input2 = new DefaultInputPort() + { +@Override +public void setup(Context.PortContext context) +{ + inputFieldObjects[1].inputClass = context.getValue(Context.PortContext.TUPLE_CLASS); +} + +@Override +public void process(Object tuple) +{ + processTuple(tuple,false); +} + +@Override +public StreamCodec getStreamCodec() +{ + return getInnerJoinStreamCodec(false); +} + }; + + @Override + public void setup(Context.OperatorContext context) + { +timeIncrement = context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * + context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS); +super.setup(context); +for (int i = 0; i < 2; i++) { + inputFieldObjects[i] = new FieldObjectMap(); +} + } + + /** + * Extract the time value from the given tuple + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + * @return + */ + @Override + public long extractTime(Object tuple, boolean isStream1Data) + { +ret
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844275 --- Diff: library/src/main/java/com/datatorrent/lib/join/InnerJoinStreamCodec.java --- @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Stream codec based on keyExpression for POJO Inner Join Operator. + * + * @Since 3.5.0 --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844228 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java --- @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue; +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.hadoop.fs.Path; +import com.google.common.collect.Maps; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * An abstract implementation of inner join operator over Managed state which extends from + * AbstractInnerJoinOperator. + * + * Properties: + * noOfBuckets: Number of buckets required for Managed state. + * bucketSpanTime: Indicates the length of the time bucket. + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends AbstractInnerJoinOperator<K,T> implements +Operator.CheckpointNotificationListener, Operator.CheckpointListener,Operator.IdleTimeHandler --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844300 --- Diff: library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams. + * + * @displayName POJO Inner Join Operator + * @tags join + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator<Object,Object> implements Operator.ActivationListener +{ + private static final transient Logger LOG = LoggerFactory.getLogger(POJOInnerJoinOperator.class); + private transient long timeIncrement; + private transient FieldObjectMap[] inputFieldObjects = (FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2); + protected transient Class outputClass; + private long time = System.currentTimeMillis(); + + @OutputPortFieldAnnotation(schemaRequired = true) + public final transient DefaultOutputPort outputPort = new DefaultOutputPort() + { +@Override +public void setup(Context.PortContext context) +{ + outputClass = context.getValue(Context.PortContext.TUPLE_CLASS); +} + }; + + @InputPortFieldAnnotation(schemaRequired = true) + public transient DefaultInputPort input1 = new DefaultInputPort() + { +@Override +public void setup(Context.PortContext context) +{ + inputFieldObjects[0].inputClass = context.getValue(Context.PortContext.TUPLE_CLASS); +} + +@Override +public void process(Object tuple) +{ + processTuple(tuple,true); +} + +@Override +public StreamCodec getStreamCodec() +{ + return getInnerJoinStreamCodec(true); +} + }; + + @InputPortFieldAnnotation(schemaRequired = true) + public transient DefaultInputPort input2 = new DefaultInputPort() + { +@Override +public void setup(Context.PortContext context) +{ + inputFieldObjects[1].inputClass = context.getValue(Context.PortContext.TUPLE_CLASS); +} + +@Override +public void process(Object tuple) +{ + processTuple(tuple,false); +} + +@Override +public StreamCodec getStreamCodec() +{ + return getInnerJoinStreamCodec(false); +} + }; + + @Override + public void setup(Context.OperatorContext context) + { +timeIncrement = context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * + context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS); +super.setup(context); +for (int i = 0; i < 2; i++) { + inputFieldObjects[i] = new FieldObjectMap(); +} + } + + /** + * Extract the time value from the given tuple + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + * @return + */ + @Override + public long extractTime(Object tuple, boolean isStream1Data) + { +ret
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844285 --- Diff: library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams. + * + * @displayName POJO Inner Join Operator + * @tags join + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator<Object,Object> implements Operator.ActivationListener +{ + private static final transient Logger LOG = LoggerFactory.getLogger(POJOInnerJoinOperator.class); --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844252 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java --- @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue; +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.hadoop.fs.Path; +import com.google.common.collect.Maps; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * An abstract implementation of inner join operator over Managed state which extends from + * AbstractInnerJoinOperator. + * + * Properties: + * noOfBuckets: Number of buckets required for Managed state. + * bucketSpanTime: Indicates the length of the time bucket. + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends AbstractInnerJoinOperator<K,T> implements +Operator.CheckpointNotificationListener, Operator.CheckpointListener,Operator.IdleTimeHandler +{ + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class); + public static final String stateDir = "managedState"; + public static final String stream1State = "stream1Data"; + public static final String stream2State = "stream2Data"; + private transient Map<JoinEvent<K,T>, Future> waitingEvents = Maps.newLinkedHashMap(); + private int noOfBuckets = 1; + private Long bucketSpanTime; + protected ManagedTimeStateImpl stream1Store; + protected ManagedTimeStateImpl stream2Store; + + /** + * Create Managed states and stores for both the streams. + */ + @Override + public void createStores() + { +stream1Store = new ManagedTimeStateImpl(); +stream2Store = new ManagedTimeStateImpl(); +stream1Store.setNumBuckets(noOfBuckets); +stream2Store.setNumBuckets(noOfBuckets); +if (bucketSpanTime != null) { + stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime)); + stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime)); +} + stream1Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime())); + stream2Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime())); + +component = new ManagedSpillableComplexComponent(); +stream1Data = ((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream1Store, !isStream1KeyPrimary()); +stream2Data = ((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream2Store, !isStream2KeyPrimary()); + } + + /** + * Process the tuple which are received from input ports with the following steps: + * 1) Extract key from the given tuple + * 2) Insert <key,tuple> into the store where store is the stream1Data if the tuple + * receives from stream1 or viceversa. + * 3) Get the values of the key in asynchronous if found it in opposite stor
[GitHub] apex-malhar pull request #351: APEXMALHAR-2169 Fixed the issue of dynamic pa...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/351#discussion_r73824988 --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java --- @@ -188,6 +188,8 @@ @Min(1) private int initialPartitionCount = 1; + private boolean isPartitionBasedOnLoad = false; --- End diff -- Is this can be achieved as follows. Please correct it, if I am wrong. if (msgRateUpperBound != Long.MAX_VALUE) { /// Dynamic partition based on load is enabled } else { /// Dynamic partition based on load is disabled } --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #351: APEXMALHAR-2169 Fixed the issue of dynamic pa...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/351#discussion_r73824992 --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java --- @@ -834,17 +840,16 @@ private boolean isPartitionRequired(int opid, List
[GitHub] apex-malhar pull request #360: APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 r...
GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-malhar/pull/360 APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 reader issue You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2174-S3-ReaderIssue Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/360.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #360 commit 01b0f42d1d0ab2e6030e390e10e1dafba72f3302 Author: Chaitanya <chaita...@datatorrent.com> Date: 2016-08-03T10:13:30Z APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 reader issue --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73290027 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java --- @@ -0,0 +1,357 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.StreamCodec; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.netlet.util.Slice; + +/** + * Concrete implementation of SpillableByteArrayListMultimap which is needed for join operator. + * + * Properties: + * isKeyContainsMultiValue: Specifies whether the key has multiple value or not. + * timeBucket: Specifies the lenght of the time bucket. + * + */ +public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableByteArrayListMultimap<K,V> +{ + private transient StreamCodec streamCodec = null; + private boolean isKeyContainsMultiValue = false; + private long timeBucket; + @NotNull + private ManagedTimeStateImpl store; + + public ManagedTimeStateMultiValue() + { +if (streamCodec == null) { + streamCodec = new KryoSerializableStreamCodec(); +} + } + + public ManagedTimeStateMultiValue(@NotNull ManagedTimeStateImpl store, boolean isPrimaryKey) + { +this(); +this.store = Preconditions.checkNotNull(store); +this.isKeyContainsMultiValue = isPrimaryKey; + } + + /** + * Return the list of values from the store + * @param k given key + * @return list of values + */ + @Override + public List get(@Nullable K k) + { +List value = null; +Slice valueSlice = store.getSync(getBucketId(k), streamCodec.toByteArray(k)); +if (isKeyContainsMultiValue) { + value = (List)streamCodec.fromByteArray(valueSlice); +} else { + if (valueSlice == null || valueSlice.length == 0 || valueSlice.buffer == null) { +return null; + } + value = new ArrayList<>(); + value.add((V)streamCodec.fromByteArray(valueSlice)); +} +return value; + } + + /** + * Returns the Future form the store. + * @param k given key + * @return + */ + public CompositeFuture getAsync(@Nullable K k) + { +return new CompositeFuture(store.getAsync(getBucketId(k), streamCodec.toByteArray(k))); + } + + @Override + public Set keySet() + { +throw new UnsupportedOperationException(); + } + + @Override + public Multiset keys() + { +throw new UnsupportedOperationException(); + } + + @Override + public Collection values() + { +throw new UnsupportedOperationException(); + } + + @Override + public Collection<Map.Entry<K, V>> entries() + { +throw new UnsupportedOperationException(); + } + + @Override + public List removeAll(@Nullable Object o) + { +throw new UnsupportedOperationException(); + } + + @Override + public void clear() + { + + } + + @Override + public int size() + { +throw new UnsupportedOpe
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73289926 --- Diff: library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java --- @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams. + * + * @displayName POJO Inner Join Operator + * @tags join + */ +public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator<Object,Object> implements Operator.ActivationListener +{ + private static final transient Logger LOG = LoggerFactory.getLogger(POJOInnerJoinOperator.class); + private transient long timeIncrement; + private transient FieldObjectMap[] inputFieldObjects = (FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2); + protected Class outputClass; + private long time = System.currentTimeMillis(); --- End diff -- Yes. It will restore back from the state when it crash and come back. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73289947 --- Diff: library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java --- @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams. + * + * @displayName POJO Inner Join Operator + * @tags join + */ +public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator<Object,Object> implements Operator.ActivationListener +{ + private static final transient Logger LOG = LoggerFactory.getLogger(POJOInnerJoinOperator.class); + private transient long timeIncrement; + private transient FieldObjectMap[] inputFieldObjects = (FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2); + protected Class outputClass; + private long time = System.currentTimeMillis(); + + @OutputPortFieldAnnotation(schemaRequired = true) + public final transient DefaultOutputPort outputPort = new DefaultOutputPort() + { +@Override +public void setup(Context.PortContext context) +{ + outputClass = context.getValue(Context.PortContext.TUPLE_CLASS); +} + }; + + @InputPortFieldAnnotation(schemaRequired = true) + public transient DefaultInputPort input1 = new DefaultInputPort() + { +@Override +public void setup(Context.PortContext context) +{ + inputFieldObjects[0].inputClass = context.getValue(Context.PortContext.TUPLE_CLASS); +} + +@Override +public void process(Object tuple) +{ + processTuple(tuple,true); +} + +@Override +public StreamCodec getStreamCodec() +{ + return getInnerJoinStreamCodec(true); +} + }; + + @InputPortFieldAnnotation(schemaRequired = true) + public transient DefaultInputPort input2 = new DefaultInputPort() + { +@Override +public void setup(Context.PortContext context) +{ + inputFieldObjects[1].inputClass = context.getValue(Context.PortContext.TUPLE_CLASS); +} + +@Override +public void process(Object tuple) +{ + processTuple(tuple,false); +} + +@Override +public StreamCodec getStreamCodec() +{ + return getInnerJoinStreamCodec(false); +} + }; + + @Override + public void setup(Context.OperatorContext context) + { +timeIncrement = context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * + context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS); +super.setup(context); +for (int i = 0; i < 2; i++) { + inputFieldObjects[i] = new FieldObjectMap(); +} + } + + /** + * Extract the time value from the given tuple + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + * @return + */ + @Override + public long extractTime(Object tuple, boolean isStream1Data) + { +return timeFields == null ? time : (long)(isStream1Data ? inputFieldObje
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73289992 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java --- @@ -0,0 +1,357 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.StreamCodec; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.netlet.util.Slice; + +/** + * Concrete implementation of SpillableByteArrayListMultimap which is needed for join operator. + * + * Properties: + * isKeyContainsMultiValue: Specifies whether the key has multiple value or not. + * timeBucket: Specifies the lenght of the time bucket. + * + */ +public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableByteArrayListMultimap<K,V> +{ + private transient StreamCodec streamCodec = null; + private boolean isKeyContainsMultiValue = false; + private long timeBucket; + @NotNull + private ManagedTimeStateImpl store; + + public ManagedTimeStateMultiValue() + { +if (streamCodec == null) { + streamCodec = new KryoSerializableStreamCodec(); +} + } + + public ManagedTimeStateMultiValue(@NotNull ManagedTimeStateImpl store, boolean isPrimaryKey) + { +this(); +this.store = Preconditions.checkNotNull(store); +this.isKeyContainsMultiValue = isPrimaryKey; + } + + /** + * Return the list of values from the store + * @param k given key + * @return list of values + */ + @Override + public List get(@Nullable K k) + { +List value = null; +Slice valueSlice = store.getSync(getBucketId(k), streamCodec.toByteArray(k)); +if (isKeyContainsMultiValue) { + value = (List)streamCodec.fromByteArray(valueSlice); +} else { + if (valueSlice == null || valueSlice.length == 0 || valueSlice.buffer == null) { --- End diff -- Yes. Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73289869 --- Diff: library/src/main/java/com/datatorrent/lib/join/ManagedSpillableComplexComponent.java --- @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.utils.serde.Serde; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * SpillableComplexComponent for Join Operator + */ +public class ManagedSpillableComplexComponent implements SpillableComplexComponent --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73289943 --- Diff: library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java --- @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams. + * + * @displayName POJO Inner Join Operator + * @tags join + */ +public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator<Object,Object> implements Operator.ActivationListener +{ + private static final transient Logger LOG = LoggerFactory.getLogger(POJOInnerJoinOperator.class); + private transient long timeIncrement; + private transient FieldObjectMap[] inputFieldObjects = (FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2); + protected Class outputClass; + private long time = System.currentTimeMillis(); + + @OutputPortFieldAnnotation(schemaRequired = true) + public final transient DefaultOutputPort outputPort = new DefaultOutputPort() + { +@Override +public void setup(Context.PortContext context) +{ + outputClass = context.getValue(Context.PortContext.TUPLE_CLASS); +} + }; + + @InputPortFieldAnnotation(schemaRequired = true) + public transient DefaultInputPort input1 = new DefaultInputPort() + { +@Override +public void setup(Context.PortContext context) +{ + inputFieldObjects[0].inputClass = context.getValue(Context.PortContext.TUPLE_CLASS); +} + +@Override +public void process(Object tuple) +{ + processTuple(tuple,true); +} + +@Override +public StreamCodec getStreamCodec() +{ + return getInnerJoinStreamCodec(true); +} + }; + + @InputPortFieldAnnotation(schemaRequired = true) + public transient DefaultInputPort input2 = new DefaultInputPort() + { +@Override +public void setup(Context.PortContext context) +{ + inputFieldObjects[1].inputClass = context.getValue(Context.PortContext.TUPLE_CLASS); +} + +@Override +public void process(Object tuple) +{ + processTuple(tuple,false); +} + +@Override +public StreamCodec getStreamCodec() +{ + return getInnerJoinStreamCodec(false); +} + }; + + @Override + public void setup(Context.OperatorContext context) + { +timeIncrement = context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * + context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS); +super.setup(context); +for (int i = 0; i < 2; i++) { + inputFieldObjects[i] = new FieldObjectMap(); +} + } + + /** + * Extract the time value from the given tuple + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + * @return + */ + @Override + public long extractTime(Object tuple, boolean isStream1Data) + { +return timeFields == null ? time : (long)(isStream1Data ? inputFieldObje
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73289857 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java --- @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue; +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.hadoop.fs.Path; +import com.google.common.collect.Maps; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; + +/** + * An abstract implementation of inner join operator over Managed state which extends from + * AbstractInnerJoinOperator. + * + * Properties: + * noOfBuckets: Number of buckets required for Managed state. + * bucketSpanTime: Indicates the length of the time bucket. + */ +public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends AbstractInnerJoinOperator<K,T> implements +Operator.CheckpointNotificationListener, Operator.CheckpointListener,Operator.IdleTimeHandler +{ + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class); + public static final String stateDir = "managedState"; + public static final String stream1State = "stream1Data"; + public static final String stream2State = "stream2Data"; + private transient long sleepMillis; + private transient Map<JoinEvent<K,T>, Future> waitingEvents = Maps.newLinkedHashMap(); + private int noOfBuckets = 1; + private Long bucketSpanTime; + protected ManagedTimeStateImpl stream1Store; + protected ManagedTimeStateImpl stream2Store; + + /** + * Create Managed states and stores for both the streams. + */ + @Override + public void createStores() + { +stream1Store = new ManagedTimeStateImpl(); +stream2Store = new ManagedTimeStateImpl(); +stream1Store.setNumBuckets(noOfBuckets); +stream2Store.setNumBuckets(noOfBuckets); +if (bucketSpanTime != null) { + stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime)); + stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime)); +} + +if (getStream1ExpiryTime() != null) { + stream1Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getStream1ExpiryTime())); +} +if (getStream2ExpiryTime() != null) { + stream2Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getStream2ExpiryTime())); +} + +component = new ManagedSpillableComplexComponent(); +stream1Data = ((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream1Store, !isStream1KeyPrimary()); +stream2Data = ((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream2Store, !isStream2KeyPrimary()); + } + + /** + * Process the tuple which are received from input ports with the following steps: + * 1) Extract key from the given tuple + * 2) Insert <key,tuple> into the store where store is the stream1Data if the tuple + * receives from stream1 or viceversa. + * 3) Get the values of the key in asynchronous if found it in opposite store + * 4) If the future is done then Merge the given tupl
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73289845 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java --- @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue; +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.hadoop.fs.Path; +import com.google.common.collect.Maps; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; + +/** + * An abstract implementation of inner join operator over Managed state which extends from + * AbstractInnerJoinOperator. + * + * Properties: + * noOfBuckets: Number of buckets required for Managed state. + * bucketSpanTime: Indicates the length of the time bucket. + */ +public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends AbstractInnerJoinOperator<K,T> implements +Operator.CheckpointNotificationListener, Operator.CheckpointListener,Operator.IdleTimeHandler +{ + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class); + public static final String stateDir = "managedState"; + public static final String stream1State = "stream1Data"; + public static final String stream2State = "stream2Data"; + private transient long sleepMillis; + private transient Map<JoinEvent<K,T>, Future> waitingEvents = Maps.newLinkedHashMap(); + private int noOfBuckets = 1; + private Long bucketSpanTime; + protected ManagedTimeStateImpl stream1Store; + protected ManagedTimeStateImpl stream2Store; + + /** + * Create Managed states and stores for both the streams. + */ + @Override + public void createStores() + { +stream1Store = new ManagedTimeStateImpl(); +stream2Store = new ManagedTimeStateImpl(); +stream1Store.setNumBuckets(noOfBuckets); +stream2Store.setNumBuckets(noOfBuckets); +if (bucketSpanTime != null) { + stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime)); + stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime)); +} + +if (getStream1ExpiryTime() != null) { + stream1Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getStream1ExpiryTime())); +} +if (getStream2ExpiryTime() != null) { + stream2Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getStream2ExpiryTime())); +} + +component = new ManagedSpillableComplexComponent(); +stream1Data = ((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream1Store, !isStream1KeyPrimary()); +stream2Data = ((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream2Store, !isStream2KeyPrimary()); + } + + /** + * Process the tuple which are received from input ports with the following steps: + * 1) Extract key from the given tuple + * 2) Insert <key,tuple> into the store where store is the stream1Data if the tuple + * receives from stream1 or viceversa. + * 3) Get the values of the key in asynchronous if found it in opposite store + * 4) If the future is done then Merge the given tupl
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73289826 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java --- @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Arrays; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.common.util.BaseOperator; + +/** + * + * An abstract implementation of inner join operator. Operator receives tuples from two streams, + * applies the join operation based on constraint and emit the joined value. + * Concrete classes should provide implementation to extractKey, extractTime, mergeTuples methods. + * + * Properties: + * includeFieldStr: List of comma separated fields to be added to the output tuple. + * Ex: Field1,Field2;Field3,Field4 + * keyFields: List of comma separated key field for both the streams. Ex: Field1,Field2 + * timeFields: List of comma separated time field for both the streams. Ex: Field1,Field2 + * expiryTime: Expiry time for stored tuples + * isStream1KeyPrimary: : Specifies whether the stream1 key is primary or not + * isStream2KeyPrimary: : Specifies whether the stream2 key is primary or not + * + * Example: + * Left input port receives customer details and right input port receives Order details. + * Schema for the Customer be in the form of {ID, Name, CTime} + * Schema for the Order be in the form of {OID, CID, OTime} + * Now, Join the tuples of Customer and Order streams where Customer.ID = Order.CID and the constraint is + * matched tuples must have timestamp within 5 minutes. + * Here, key Fields = ID, CID and Time Fields = CTime, OTime, expiryTime = 5 minutes + * + * @displayName Abstract Inner Join Operator + * @tags join + */ +public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator +{ + protected transient String[][] includeFields; + protected transient List keyFields; + protected transient List timeFields; + @AutoMetric + private long tuplesJoinedPerSec; + private double windowTimeSec; + private int tuplesCount; + @NotNull + private String keyFieldsStr; + @NotNull + private String includeFieldStr; + private String timeFieldsStr; + private Long stream1ExpiryTime; + private Long stream2ExpiryTime; + private boolean isStream1KeyPrimary = true; + private boolean isStream2KeyPrimary = true; + protected SpillableComplexComponent component; + protected Spillable.SpillableByteArrayListMultimap<K,T> stream1Data; + protected Spillable.SpillableByteArrayListMultimap<K,T> stream2Data; + + /** + * Process the tuple which are received from input ports with the following steps: + * 1) Extract key from the given tuple + * 2) Insert <key,tuple> into the store where store is the stream1Data if the tuple + * receives from stream1 or viceversa. + * 3) Get the values of the key if found it in opposite store + * 4) Merge the given tuple and values found from step (3) + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + */ + protected void processTuple(T tuple, boolean isStream1Data) + { +Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data; +K key = extractKey
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73282037 --- Diff: library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java --- @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams. + * + * @displayName POJO Inner Join Operator + * @tags join + */ +public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator<Object,Object> implements Operator.ActivationListener +{ + private static final transient Logger LOG = LoggerFactory.getLogger(POJOInnerJoinOperator.class); + private transient long timeIncrement; + private transient FieldObjectMap[] inputFieldObjects = (FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2); + protected Class outputClass; + private long time = System.currentTimeMillis(); + + @OutputPortFieldAnnotation(schemaRequired = true) + public final transient DefaultOutputPort outputPort = new DefaultOutputPort() + { +@Override +public void setup(Context.PortContext context) +{ + outputClass = context.getValue(Context.PortContext.TUPLE_CLASS); +} + }; + + @InputPortFieldAnnotation(schemaRequired = true) + public transient DefaultInputPort input1 = new DefaultInputPort() + { +@Override +public void setup(Context.PortContext context) +{ + inputFieldObjects[0].inputClass = context.getValue(Context.PortContext.TUPLE_CLASS); +} + +@Override +public void process(Object tuple) +{ + processTuple(tuple,true); +} + +@Override +public StreamCodec getStreamCodec() +{ + return getInnerJoinStreamCodec(true); +} + }; + + @InputPortFieldAnnotation(schemaRequired = true) + public transient DefaultInputPort input2 = new DefaultInputPort() + { +@Override +public void setup(Context.PortContext context) +{ + inputFieldObjects[1].inputClass = context.getValue(Context.PortContext.TUPLE_CLASS); +} + +@Override +public void process(Object tuple) +{ + processTuple(tuple,false); +} + +@Override +public StreamCodec getStreamCodec() +{ + return getInnerJoinStreamCodec(false); +} + }; + + @Override + public void setup(Context.OperatorContext context) + { +timeIncrement = context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * + context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS); +super.setup(context); +for (int i = 0; i < 2; i++) { + inputFieldObjects[i] = new FieldObjectMap(); +} + } + + /** + * Extract the time value from the given tuple + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + * @return + */ + @Override + public long extractTime(Object tuple, boolean isStream1Data) + { +return timeFields == null ? time : (long)(isStream1Data ? inputFieldObje
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73282003 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java --- @@ -0,0 +1,357 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.StreamCodec; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.netlet.util.Slice; + +/** + * Concrete implementation of SpillableByteArrayListMultimap which is needed for join operator. + * + * Properties: + * isKeyContainsMultiValue: Specifies whether the key has multiple value or not. + * timeBucket: Specifies the lenght of the time bucket. + * + */ +public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableByteArrayListMultimap<K,V> +{ + private transient StreamCodec streamCodec = null; + private boolean isKeyContainsMultiValue = false; + private long timeBucket; + @NotNull + private ManagedTimeStateImpl store; + + public ManagedTimeStateMultiValue() + { +if (streamCodec == null) { + streamCodec = new KryoSerializableStreamCodec(); +} + } + + public ManagedTimeStateMultiValue(@NotNull ManagedTimeStateImpl store, boolean isPrimaryKey) + { +this(); +this.store = Preconditions.checkNotNull(store); +this.isKeyContainsMultiValue = isPrimaryKey; + } + + /** + * Return the list of values from the store + * @param k given key + * @return list of values + */ + @Override + public List get(@Nullable K k) + { +List value = null; +Slice valueSlice = store.getSync(getBucketId(k), streamCodec.toByteArray(k)); +if (isKeyContainsMultiValue) { + value = (List)streamCodec.fromByteArray(valueSlice); +} else { + if (valueSlice == null || valueSlice.length == 0 || valueSlice.buffer == null) { +return null; + } + value = new ArrayList<>(); + value.add((V)streamCodec.fromByteArray(valueSlice)); +} +return value; + } + + /** + * Returns the Future form the store. + * @param k given key + * @return + */ + public CompositeFuture getAsync(@Nullable K k) + { +return new CompositeFuture(store.getAsync(getBucketId(k), streamCodec.toByteArray(k))); + } + + @Override + public Set keySet() + { +throw new UnsupportedOperationException(); + } + + @Override + public Multiset keys() + { +throw new UnsupportedOperationException(); + } + + @Override + public Collection values() + { +throw new UnsupportedOperationException(); + } + + @Override + public Collection<Map.Entry<K, V>> entries() + { +throw new UnsupportedOperationException(); + } + + @Override + public List removeAll(@Nullable Object o) + { +throw new UnsupportedOperationException(); + } + + @Override + public void clear() + { + + } + + @Override + public int size() + { +throw new UnsupportedOpe
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73281797 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java --- @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue; +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.hadoop.fs.Path; +import com.google.common.collect.Maps; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; + +/** + * An abstract implementation of inner join operator over Managed state which extends from + * AbstractInnerJoinOperator. + * + * Properties: + * noOfBuckets: Number of buckets required for Managed state. + * bucketSpanTime: Indicates the length of the time bucket. + */ +public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends AbstractInnerJoinOperator<K,T> implements +Operator.CheckpointNotificationListener, Operator.CheckpointListener,Operator.IdleTimeHandler +{ + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class); + public static final String stateDir = "managedState"; + public static final String stream1State = "stream1Data"; + public static final String stream2State = "stream2Data"; + private transient long sleepMillis; + private transient Map<JoinEvent<K,T>, Future> waitingEvents = Maps.newLinkedHashMap(); + private int noOfBuckets = 1; + private Long bucketSpanTime; + protected ManagedTimeStateImpl stream1Store; + protected ManagedTimeStateImpl stream2Store; + + /** + * Create Managed states and stores for both the streams. + */ + @Override + public void createStores() + { +stream1Store = new ManagedTimeStateImpl(); +stream2Store = new ManagedTimeStateImpl(); +stream1Store.setNumBuckets(noOfBuckets); +stream2Store.setNumBuckets(noOfBuckets); +if (bucketSpanTime != null) { --- End diff -- Ok. Will make expiryTime as mandatory. I thought of different use case and this is invalid in streaming scenario. I will make single expiry time for both the streams. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73281761 --- Diff: library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java --- @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams. + * + * @displayName POJO Inner Join Operator + * @tags join + */ +public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator<Object,Object> implements Operator.ActivationListener +{ + private static final transient Logger LOG = LoggerFactory.getLogger(POJOInnerJoinOperator.class); + private transient long timeIncrement; + private transient FieldObjectMap[] inputFieldObjects = (FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2); + protected Class outputClass; + private long time = System.currentTimeMillis(); + + @OutputPortFieldAnnotation(schemaRequired = true) + public final transient DefaultOutputPort outputPort = new DefaultOutputPort() + { +@Override +public void setup(Context.PortContext context) +{ + outputClass = context.getValue(Context.PortContext.TUPLE_CLASS); +} + }; + + @InputPortFieldAnnotation(schemaRequired = true) + public transient DefaultInputPort input1 = new DefaultInputPort() + { +@Override +public void setup(Context.PortContext context) +{ + inputFieldObjects[0].inputClass = context.getValue(Context.PortContext.TUPLE_CLASS); +} + +@Override +public void process(Object tuple) +{ + processTuple(tuple,true); +} + +@Override +public StreamCodec getStreamCodec() +{ + return getInnerJoinStreamCodec(true); +} + }; + + @InputPortFieldAnnotation(schemaRequired = true) + public transient DefaultInputPort input2 = new DefaultInputPort() + { +@Override +public void setup(Context.PortContext context) +{ + inputFieldObjects[1].inputClass = context.getValue(Context.PortContext.TUPLE_CLASS); +} + +@Override +public void process(Object tuple) +{ + processTuple(tuple,false); +} + +@Override +public StreamCodec getStreamCodec() +{ + return getInnerJoinStreamCodec(false); +} + }; + + @Override + public void setup(Context.OperatorContext context) + { +timeIncrement = context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * + context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS); +super.setup(context); +for (int i = 0; i < 2; i++) { + inputFieldObjects[i] = new FieldObjectMap(); --- End diff -- No. In declaration, it creates the array with Map type. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73145124 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java --- @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Arrays; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.common.util.BaseOperator; + +/** + * + * An abstract implementation of inner join operator. Operator receives tuples from two streams, + * applies the join operation based on constraint and emit the joined value. + * Concrete classes should provide implementation to extractKey, extractTime, mergeTuples methods. + * + * Properties: + * includeFieldStr: List of comma separated fields to be added to the output tuple. + * Ex: Field1,Field2;Field3,Field4 + * keyFields: List of comma separated key field for both the streams. Ex: Field1,Field2 + * timeFields: List of comma separated time field for both the streams. Ex: Field1,Field2 + * expiryTime: Expiry time for stored tuples + * isStream1KeyPrimary: : Specifies whether the stream1 key is primary or not + * isStream2KeyPrimary: : Specifies whether the stream2 key is primary or not + * + * Example: + * Left input port receives customer details and right input port receives Order details. + * Schema for the Customer be in the form of {ID, Name, CTime} + * Schema for the Order be in the form of {OID, CID, OTime} + * Now, Join the tuples of Customer and Order streams where Customer.ID = Order.CID and the constraint is + * matched tuples must have timestamp within 5 minutes. + * Here, key Fields = ID, CID and Time Fields = CTime, OTime, expiryTime = 5 minutes + * + * @displayName Abstract Inner Join Operator + * @tags join + */ +public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator +{ + protected transient String[][] includeFields; + protected transient List keyFields; + protected transient List timeFields; + @AutoMetric + private long tuplesJoinedPerSec; + private double windowTimeSec; + private int tuplesCount; + @NotNull + private String keyFieldsStr; + @NotNull + private String includeFieldStr; + private String timeFieldsStr; + private Long stream1ExpiryTime; + private Long stream2ExpiryTime; + private boolean isStream1KeyPrimary = true; + private boolean isStream2KeyPrimary = true; + protected SpillableComplexComponent component; + protected Spillable.SpillableByteArrayListMultimap<K,T> stream1Data; + protected Spillable.SpillableByteArrayListMultimap<K,T> stream2Data; + + /** + * Process the tuple which are received from input ports with the following steps: + * 1) Extract key from the given tuple + * 2) Insert <key,tuple> into the store where store is the stream1Data if the tuple + * receives from stream1 or viceversa. + * 3) Get the values of the key if found it in opposite store + * 4) Merge the given tuple and values found from step (3) + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + */ + protected void processTuple(T tuple, boolean isStream1Data) + { +Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data; +K key = extractKey
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73141502 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java --- @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Arrays; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.common.util.BaseOperator; + +/** + * + * An abstract implementation of inner join operator. Operator receives tuples from two streams, + * applies the join operation based on constraint and emit the joined value. + * Concrete classes should provide implementation to extractKey, extractTime, mergeTuples methods. + * + * Properties: + * includeFieldStr: List of comma separated fields to be added to the output tuple. + * Ex: Field1,Field2;Field3,Field4 + * keyFields: List of comma separated key field for both the streams. Ex: Field1,Field2 + * timeFields: List of comma separated time field for both the streams. Ex: Field1,Field2 --- End diff -- Yes. It can be. If the timeField of the stream is not in milliseconds then the user has to override the extractTime() method and convert the field into milliseconds. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73141529 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java --- @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Arrays; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.common.util.BaseOperator; + +/** + * + * An abstract implementation of inner join operator. Operator receives tuples from two streams, + * applies the join operation based on constraint and emit the joined value. + * Concrete classes should provide implementation to extractKey, extractTime, mergeTuples methods. + * + * Properties: + * includeFieldStr: List of comma separated fields to be added to the output tuple. + * Ex: Field1,Field2;Field3,Field4 + * keyFields: List of comma separated key field for both the streams. Ex: Field1,Field2 + * timeFields: List of comma separated time field for both the streams. Ex: Field1,Field2 + * expiryTime: Expiry time for stored tuples + * isStream1KeyPrimary: : Specifies whether the stream1 key is primary or not + * isStream2KeyPrimary: : Specifies whether the stream2 key is primary or not + * + * Example: + * Left input port receives customer details and right input port receives Order details. + * Schema for the Customer be in the form of {ID, Name, CTime} + * Schema for the Order be in the form of {OID, CID, OTime} + * Now, Join the tuples of Customer and Order streams where Customer.ID = Order.CID and the constraint is + * matched tuples must have timestamp within 5 minutes. + * Here, key Fields = ID, CID and Time Fields = CTime, OTime, expiryTime = 5 minutes + * + * @displayName Abstract Inner Join Operator + * @tags join + */ +public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator +{ + protected transient String[][] includeFields; + protected transient List keyFields; + protected transient List timeFields; + @AutoMetric + private long tuplesJoinedPerSec; + private double windowTimeSec; + private int tuplesCount; + @NotNull + private String keyFieldsStr; + @NotNull + private String includeFieldStr; + private String timeFieldsStr; + private Long stream1ExpiryTime; + private Long stream2ExpiryTime; + private boolean isStream1KeyPrimary = true; + private boolean isStream2KeyPrimary = true; + protected SpillableComplexComponent component; + protected Spillable.SpillableByteArrayListMultimap<K,T> stream1Data; + protected Spillable.SpillableByteArrayListMultimap<K,T> stream2Data; + + /** + * Process the tuple which are received from input ports with the following steps: + * 1) Extract key from the given tuple + * 2) Insert <key,tuple> into the store where store is the stream1Data if the tuple + * receives from stream1 or viceversa. + * 3) Get the values of the key if found it in opposite store + * 4) Merge the given tuple and values found from step (3) + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + */ + protected void processTuple(T tuple, boolean isStream1Data) + { +Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data; +K key = extractKey
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73141506 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java --- @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Arrays; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.common.util.BaseOperator; + +/** + * + * An abstract implementation of inner join operator. Operator receives tuples from two streams, + * applies the join operation based on constraint and emit the joined value. + * Concrete classes should provide implementation to extractKey, extractTime, mergeTuples methods. + * + * Properties: + * includeFieldStr: List of comma separated fields to be added to the output tuple. + * Ex: Field1,Field2;Field3,Field4 + * keyFields: List of comma separated key field for both the streams. Ex: Field1,Field2 + * timeFields: List of comma separated time field for both the streams. Ex: Field1,Field2 + * expiryTime: Expiry time for stored tuples --- End diff -- ms. Will add it into java docs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73141485 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java --- @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Arrays; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.common.util.BaseOperator; + +/** + * + * An abstract implementation of inner join operator. Operator receives tuples from two streams, + * applies the join operation based on constraint and emit the joined value. + * Concrete classes should provide implementation to extractKey, extractTime, mergeTuples methods. + * + * Properties: + * includeFieldStr: List of comma separated fields to be added to the output tuple. + * Ex: Field1,Field2;Field3,Field4 + * keyFields: List of comma separated key field for both the streams. Ex: Field1,Field2 --- End diff -- Yes. If there are multiple key fields from each stream then the primary key field has to specify in keyFields and the other key fields has to take care in mergeTuples(). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core issue #362: APEXCORE-494 Fixed the dynamic partition issue in case...
Github user chaithu14 commented on the issue: https://github.com/apache/apex-core/pull/362 This fix is related to dynamic partitioning and logic related to re-partitioning is in redoPartitions(). Thanks Thomas, its a good suggestion. I will re-look in to it and will discuss with you before fixing it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #362: APEXCORE-494 Fixed the dynamic partition issue ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-core/pull/362#discussion_r72926469 --- Diff: engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java --- @@ -962,8 +963,11 @@ private void redoPartitions(PMapping currentMapping, String note) mainPC.operatorIdToPartition.put(p.getId(), newPartition); } +// Get the deepest down stream mapping +PMapping downStreamMapping = currentMapping; --- End diff -- If partitionContexts linked hashmap exists then downStreamMapping be the last element of hasmap otherwise downStreamMapping be the currentMapping. Lets say, the physical dag before repartition as below : A1 -> B1 -> C1 -> D1 and parallel partition between A -> B and B -> C. If A is repartitioned to 3 then downStreamMapping would be the Operator C's mapping. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core issue #362: APEXCORE-494 Fixed the dynamic partition issue in case...
Github user chaithu14 commented on the issue: https://github.com/apache/apex-core/pull/362 @vrozov Please review and merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #362: APEXCORE-494 Fixed the dynamic partition issue ...
GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-core/pull/362 APEXCORE-494 Fixed the dynamic partition issue in case of initial partition size is 1 You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-core APEXCORE-494-DPartIssue Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-core/pull/362.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #362 commit 3372ad70b63c7086483e6815315089e98703f989 Author: Chaitanya <chaita...@datatorrent.com> Date: 2016-07-26T12:09:52Z APEXCORE-494 Fixed the dynamic partition issue in case of initial partition size is 1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core issue #359: APEXCORE-494 Fixed the dynamic partition issue in case...
Github user chaithu14 commented on the issue: https://github.com/apache/apex-core/pull/359 Thanks @vrozov @amberarrow . I will into the JIRA and will update this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core issue #359: APEXCORE-494 Fixed the dynamic partition issue in case...
Github user chaithu14 commented on the issue: https://github.com/apache/apex-core/pull/359 @vrozov Can the definePartitions() return the current partition ? If yes, then there might be an issue in redoPartition(). From the above example, for the logical operator C, current partition size is 2 and new partition size is 3 which consists of current partitions {C1, C2}. Output port of C1 (or) C2 maps to input port of Unifier. If the physical dag as below: A1->B1->C1->Aggregator. Current partition size = 1 A is repartitioned to 2 then physical dag as below: A1->B1->C1-> U -> Aggregator A2->B2->C2-> new partition size = 2 Here, in the initial launch, Output of C1 maps to input of Aggregator and in after repartition, output of C1 maps to unifier. The mapping of current partition (A1) has changed in after repartition. I think, this case is not covered in redopartitions(). Please correct it, if i am wrong. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #359: APEXCORE-494 Fixed the dynamic partition issue ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-core/pull/359#discussion_r71739128 --- Diff: engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java --- @@ -895,8 +895,10 @@ private void redoPartitions(PMapping currentMapping, String note) addedPartitions.add(newPartition); } else { // check whether mapping was changed +int currentPartitionsSize = mainPC.currentPartitions.size(); for (DefaultPartition pi : mainPC.currentPartitions) { - if (pi == newPartition && pi.isModified()) { + if (pi == newPartition && (pi.isModified() || --- End diff -- @vrozov The problem occurs when definPartitions() returns the list of partitions (size > 1) which consists of current partition of size 1. Unifier will come into the picture in newPartitions which is not be there in initial launch. Here, port mapping of current partition which is in mainPC.newPartitions has to be changed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #359: APEXCORE-494 Fixed the dynamic partition issue ...
GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-core/pull/359 APEXCORE-494 Fixed the dynamic partition issue in case of initial partition size is 1 You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-core APEXCORE-494-DPartIssue Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-core/pull/359.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #359 commit c4578818fb970580310cff1d3a24d6884645c233 Author: Chaitanya <chaita...@datatorrent.com> Date: 2016-07-19T11:42:48Z APEXCORE-494 Fixed the dynamic partition issue in case of initial partition size is 1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #346: APEXMALHAR-2158 Fixed the duplication of mess...
GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-malhar/pull/346 APEXMALHAR-2158 Fixed the duplication of messages emitted issue when the Kafka Input operator redeployed You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2158-Dpdata-recovery Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/346.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #346 commit 96e0535e9c934e18ae10d71b9ee9fe9c5b9d6805 Author: Chaitanya <chaita...@datatorrent.com> Date: 2016-07-18T11:27:04Z APEXMALHAR-2158 Fixed the duplication of messages emitted issue when the Kafka Input operator redeployed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: Initial cut of Inner Join operator for REVIEW...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r70789750 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java --- @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl; +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.hadoop.fs.Path; + +import com.google.common.collect.Maps; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.join.managed.ManagedSpillableComplexComponent; +import com.datatorrent.lib.join.managed.ManagedTimeStateMultiMap; +import com.datatorrent.netlet.util.Slice; + +public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends AbstractInnerJoinOperator<K,T> implements +Operator.CheckpointNotificationListener, Operator.CheckpointListener,Operator.IdleTimeHandler +{ + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class); + public static final String stateDir = "managedState"; + public static final String stream1State = "stream1Data"; + public static final String stream2State = "stream2Data"; + private transient long sleepMillis; + private transient Map<JoinEvent<K,T>, Future> waitingEvents = Maps.newLinkedHashMap(); + private int noOfBuckets = 1; + private Long bucketSpanTime; + protected ManagedTimeStateImpl stream1Store; + protected ManagedTimeStateImpl stream2Store; + + @Override + public void createStores() + { +stream1Store = new ManagedTimeStateImpl(); +stream2Store = new ManagedTimeStateImpl(); +stream1Store.setNumBuckets(noOfBuckets); +stream2Store.setNumBuckets(noOfBuckets); +if (bucketSpanTime != null) { + stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime)); + stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime)); +} + +if (getExpiryTime() != null) { + stream1Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime())); + stream2Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime())); +} + +component = new ManagedSpillableComplexComponent(); +stream1Data = ((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream1Store, isStream1KeyPrimary()); +stream2Data = ((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream2Store, isStream2KeyPrimary()); + } + + @Override + protected void processTuple(T tuple, boolean isStream1Data) + { +Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data; +K key = extractKey(tuple,isStream1Data); +long timeBucket = extractTime(tuple,isStream1Data); +((ManagedTimeStateMultiMap)store).put(key, tuple,timeBucket); +joinStream(key,tuple,isStream1Data); + } + + @Override + protected void joinStream(K key, T tuple, boolean isStream1Data) + { +Spillable.SpillableByteArrayListMultimap<K, T> store = isStream1Data ? stream2Data : stream1Data; +Future future = ((ManagedTimeStateMultiMap)store).getAsync(ke
[GitHub] apex-malhar pull request #334: APEXMALHAR-2136 1) Fixed the null pointer exc...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/334#discussion_r69947853 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java --- @@ -536,6 +538,16 @@ public void setDurationPreventingFreeingSpace(Duration durationPreventingFreeing this.durationPreventingFreeingSpace = durationPreventingFreeingSpace; } + public IncrementalCheckpointManager getCheckpointManager() + { +return checkpointManager; + } + + public void setCheckpointManager(IncrementalCheckpointManager checkpointManager) --- End diff -- Updated as per the comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #334: APEXMALHAR-2136 1) Fixed the null pointer exc...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/334#discussion_r69860038 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java --- @@ -203,7 +203,12 @@ public void setup(OperatorContext context) //delete all the wal files with windows > activationWindow. //All the wal files with windows <= activationWindow are loaded and kept separately as recovered data. try { -for (long recoveredWindow : checkpointManager.getWindowIds(operatorContext.getId())) { +long[] recoveredWindows = checkpointManager.getWindowIds(operatorContext.getId()); +if (recoveredWindows == null) { + readerService = Executors.newFixedThreadPool(numReaders, new NameableThreadFactory("managedStateReaders")); + return; +} +for (long recoveredWindow : recoveredWindows) { --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #334: APEXMALHAR-2136 1) Fixed the null pointer exc...
GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-malhar/pull/334 APEXMALHAR-2136 1) Fixed the null pointer exception issue. 2) Added getter and setter for IncrementalCheckpointManager You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2136-NPE-RecoverWindows Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/334.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #334 commit 15b4858b72955ff2ce83e1817ac3f64c15347106 Author: Chaitanya <chaita...@datatorrent.com> Date: 2016-07-07T05:42:45Z APEXMALHAR-2136 1) Fixed the null pointer exception issue. 2) Added getter and setter for IncrementalCheckpointManager --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: Initial cut of Inner Join operator for REVIEW...
GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-malhar/pull/330 Initial cut of Inner Join operator for REVIEW only You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2100-InnerJoin Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/330.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #330 commit 6987dce47a5789e8de578ae3ee77c2cf6507fcc0 Author: Chaitanya <chaita...@datatorrent.com> Date: 2016-07-05T12:14:46Z Initial cut of Inner Join operator for REVIEW only --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r68926798 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java --- @@ -0,0 +1,241 @@ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * Created by tfarkas on 6/12/16. + */ +public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>, +Spillable.SpillableComponent +{ + public static final int DEFAULT_BATCH_SIZE = 1000; + public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0}; + + private int batchSize = DEFAULT_BATCH_SIZE; + + private WindowBoundedMapCache<K, SpillableArrayListImpl> cache = new WindowBoundedMapCache<>(); + + @NotNull + private SpillableByteMapImpl<byte[], Integer> map; + + private SpillableStateStore store; + private byte[] identifier; + private long bucket; + private Serde<K, Slice> serdeKey; + private Serde<V, Slice> serdeValue; + + public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket, + Serde<K, Slice> serdeKey, + Serde<V, Slice> serdeValue) + { +this.store = Preconditions.checkNotNull(store); +this.identifier = Preconditions.checkNotNull(identifier); +this.bucket = bucket; +this.serdeKey = Preconditions.checkNotNull(serdeKey); +this.serdeValue = Preconditions.checkNotNull(serdeValue); + +map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice()); + } + + @Override + public List get(@Nullable K key) + { +return getHelper(key); + } + --- End diff -- I think, we need to provide the asynchronous get method also. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r68926769 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java --- @@ -0,0 +1,241 @@ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * Created by tfarkas on 6/12/16. + */ +public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>, +Spillable.SpillableComponent +{ + public static final int DEFAULT_BATCH_SIZE = 1000; + public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0}; + + private int batchSize = DEFAULT_BATCH_SIZE; + + private WindowBoundedMapCache<K, SpillableArrayListImpl> cache = new WindowBoundedMapCache<>(); + + @NotNull + private SpillableByteMapImpl<byte[], Integer> map; + + private SpillableStateStore store; + private byte[] identifier; + private long bucket; + private Serde<K, Slice> serdeKey; --- End diff -- Why do we need to configure the bucket? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r68560022 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java --- @@ -0,0 +1,241 @@ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * Created by tfarkas on 6/12/16. + */ +public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>, +Spillable.SpillableComponent +{ + public static final int DEFAULT_BATCH_SIZE = 1000; + public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0}; + + private int batchSize = DEFAULT_BATCH_SIZE; + + private WindowBoundedMapCache<K, SpillableArrayListImpl> cache = new WindowBoundedMapCache<>(); + + @NotNull + private SpillableByteMapImpl<byte[], Integer> map; + + private SpillableStateStore store; + private byte[] identifier; + private long bucket; + private Serde<K, Slice> serdeKey; + private Serde<V, Slice> serdeValue; + + public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket, + Serde<K, Slice> serdeKey, + Serde<V, Slice> serdeValue) + { +this.store = Preconditions.checkNotNull(store); +this.identifier = Preconditions.checkNotNull(identifier); +this.bucket = bucket; +this.serdeKey = Preconditions.checkNotNull(serdeKey); +this.serdeValue = Preconditions.checkNotNull(serdeValue); + +map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice()); + } + + @Override + public List get(@Nullable K key) + { +return getHelper(key); + } + + private SpillableArrayListImpl getHelper(@Nullable K key) + { +SpillableArrayListImpl spillableArrayList = cache.get(key); + +if (spillableArrayList == null) { + Slice keyPrefix = serdeKey.serialize(key); + Integer size = map.get(SliceUtils.concatenate(keyPrefix, SIZE_KEY_SUFFIX)); + --- End diff -- Accessing the value from map should be as follows: map.get(SliceUtils.concatenate().buffer) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r68559313 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java --- @@ -0,0 +1,52 @@ +package org.apache.apex.malhar.lib.utils.serde; + +import com.datatorrent.netlet.util.Slice; + +public class SliceUtils +{ + private SliceUtils() + { + } + + public static byte[] concatenate(byte[] a, byte[] b) + { +byte[] output = new byte[a.length + b.length]; + +System.arraycopy(a, 0, output, 0, a.length); +System.arraycopy(b, 0, output, a.length, b.length); +return output; + } + + public static Slice concatenate(Slice a, Slice b) + { +int size = a.length + b.length; +byte[] bytes = new byte[size]; + +System.arraycopy(a.buffer, a.offset, bytes, 0, a.length); +System.arraycopy(b.buffer, b.offset, bytes, a.length, b.length); + +return new Slice(bytes); + } + + public static Slice concatenate(byte[] a, Slice b) + { +int size = a.length + b.length; +byte[] bytes = new byte[size]; + +System.arraycopy(a, 0, bytes, 0, a.length); +System.arraycopy(b.buffer, b.offset, bytes, a.length, b.length); + +return new Slice(bytes); + } + + public static Slice concatenate(Slice a, byte[] b) + { +int size = a.length + b.length; +byte[] bytes = new byte[size]; + +System.arraycopy(a, a.offset, bytes, 0, a.length); --- End diff -- a.buffer instead of a. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r68559266 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java --- @@ -0,0 +1,191 @@ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.mutable.MutableInt; + +import com.google.common.base.Preconditions; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +public class SpillableByteMapImpl<K, V> implements Spillable.SpillableByteMap<K, V>, Spillable.SpillableComponent +{ + @NotNull + private SpillableStateStore store; + @NotNull + private byte[] identifier; + private long bucket; + @NotNull + private Serde<K, Slice> serdeKey; + @NotNull + private Serde<V, Slice> serdeValue; + + private int size = 0; + + private transient WindowBoundedMapCache<K, V> cache = new WindowBoundedMapCache<>(); + private transient MutableInt tempOffset = new MutableInt(); + + private SpillableByteMapImpl() + { +//for kryo + } + + public SpillableByteMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K, Slice> serdeKey, + Serde<V, Slice> serdeValue) + { +this.store = Preconditions.checkNotNull(store); +this.identifier = Preconditions.checkNotNull(identifier); +this.bucket = bucket; +this.serdeKey = Preconditions.checkNotNull(serdeKey); +this.serdeValue = Preconditions.checkNotNull(serdeValue); + } + + @Override + public int size() + { +return size; + } + + @Override + public boolean isEmpty() + { +return size == 0; + } + + @Override + public boolean containsKey(Object o) + { +return get(o) != null; + } + + @Override + public boolean containsValue(Object o) + { +throw new UnsupportedOperationException(); + } + + @Override + public V get(Object o) + { +K key = (K)o; + +if (cache.getRemovedKeys().contains(key)) { + return null; +} + +V val = cache.get(key); + +if (val != null) { + return val; +} + +Slice valSlice = store.getSync(bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key))); + +if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) { + return null; +} + +tempOffset.setValue(valSlice.offset + identifier.length); +return serdeValue.deserialize(valSlice, tempOffset); + } + + @Override + public V put(K k, V v) + { +V value = get(k); + +if (value == null) { + size++; +} + +cache.put(k, v); + +return value; + } + + @Override + public V remove(Object o) + { +V value = get(o); + +if (value != null) { + size--; +} + +cache.remove((K)o); + +return value; + } + + @Override + public void putAll(Map map) + { +for (Map.Entry entry : map.entrySet()) { + put(entry.getKey(), entry.getValue()); +} + } + + @Override + public void clear() + { +throw new UnsupportedOperationException(); + } + + @Override + public Set keySet() + { +throw new UnsupportedOperationException(); + } + + @Override + public Collection values() + { +throw new UnsupportedOperationException(); + } + + @Override + public Set<Entry<K, V>> entrySet() + { +throw new UnsupportedOperationException(); + } + + @Override + public void setup(Context.OperatorContext context) + { + } --- End diff -- I think store.setup() has to be called in setup method because SpillableStateStore extends from Component interface. Similarly the other methods like beginWindow, endWindow, teardown. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r68559252 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java --- @@ -0,0 +1,241 @@ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * Created by tfarkas on 6/12/16. + */ +public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>, +Spillable.SpillableComponent +{ + public static final int DEFAULT_BATCH_SIZE = 1000; + public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0}; + + private int batchSize = DEFAULT_BATCH_SIZE; + + private WindowBoundedMapCache<K, SpillableArrayListImpl> cache = new WindowBoundedMapCache<>(); + + @NotNull + private SpillableByteMapImpl<byte[], Integer> map; + + private SpillableStateStore store; + private byte[] identifier; + private long bucket; + private Serde<K, Slice> serdeKey; + private Serde<V, Slice> serdeValue; + + public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket, + Serde<K, Slice> serdeKey, + Serde<V, Slice> serdeValue) + { +this.store = Preconditions.checkNotNull(store); +this.identifier = Preconditions.checkNotNull(identifier); +this.bucket = bucket; +this.serdeKey = Preconditions.checkNotNull(serdeKey); +this.serdeValue = Preconditions.checkNotNull(serdeValue); + +map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice()); + } + + @Override + public List get(@Nullable K key) + { +return getHelper(key); + } + + private SpillableArrayListImpl getHelper(@Nullable K key) + { +SpillableArrayListImpl spillableArrayList = cache.get(key); + +if (spillableArrayList == null) { + Slice keyPrefix = serdeKey.serialize(key); + Integer size = map.get(SliceUtils.concatenate(keyPrefix, SIZE_KEY_SUFFIX)); + + if (size == null) { +return null; + } + + spillableArrayList = new SpillableArrayListImpl(bucket, keyPrefix.buffer, store, serdeValue); + spillableArrayList.setSize(size); + + cache.put(key, spillableArrayList); +} + +return spillableArrayList; + } + + @Override + public Set keySet() + { +throw new UnsupportedOperationException(); + } + + @Override + public Multiset keys() + { +throw new UnsupportedOperationException(); + } + + @Override + public Collection values() + { +throw new UnsupportedOperationException(); + } + + @Override + public Collection<Map.Entry<K, V>> entries() + { +throw new UnsupportedOperationException(); + } + + @Override + public List removeAll(@Nullable Object key) + { +throw new UnsupportedOperationException(); + } + + @Override + public void clear() + { +throw new UnsupportedOperationException(); + } + + @Override + public int size() + { +return map.size(); + } + + @Override + public boolean isEmpty() + { +return map.isEmpty(); + } + + @Override + public boolean containsKey(@Nullable Object key) + { +return map.containsKey(SliceUtils.concatenate(serdeKey.serialize((K)key), SIZE_KEY_SUFFIX)); + } + + @Override + public boolean containsValue(@Nullable Object value) + { +throw new UnsupportedOperationException(); + } + + @Override + public boolean containsEntry(@Nullable Object key, @Nullable Object value) + { +throw new UnsupportedOperationException(); + } + + @Override + public boolean put(@Nullable K key, @Nullable V value) + {
[GitHub] incubator-apex-malhar pull request #300: APEXMALHAR-2103 Fixed the scanner i...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/300#discussion_r65475558 --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java --- @@ -375,11 +375,14 @@ public void run() lastScannedInfo = null; numDiscoveredPerIteration = 0; for (String afile : files) { - String filePath = new File(afile).getAbsolutePath(); + Path filePath = new Path(afile); LOG.debug("Scan started for input {}", filePath); - Map<String, Long> lastModifiedTimesForInputDir; - lastModifiedTimesForInputDir = referenceTimes.get(filePath); - scan(new Path(afile), null, lastModifiedTimesForInputDir); + Map<String, Long> lastModifiedTimesForInputDir = null; + if (fs.exists(filePath)) { +FileStatus fileStatus = fs.getFileStatus(filePath); +lastModifiedTimesForInputDir = referenceTimes.get(fileStatus.getPath().toUri().getPath()); --- End diff -- No. In createScannedFileInfo(), the directory/file path specified as absolute path. But, In case of local file system, files might be relative path. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2103 Fixed the scanner issue ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/300#discussion_r65303185 --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java --- @@ -375,11 +374,18 @@ public void run() lastScannedInfo = null; numDiscoveredPerIteration = 0; for (String afile : files) { - String filePath = new File(afile).getAbsolutePath(); - LOG.debug("Scan started for input {}", filePath); - Map<String, Long> lastModifiedTimesForInputDir; - lastModifiedTimesForInputDir = referenceTimes.get(filePath); - scan(new Path(afile), null, lastModifiedTimesForInputDir); + Path filePath = new Path(afile); + LOG.debug("Scan started for input {}", filePath.toString()); + Map<String, Long> lastModifiedTimesForInputDir = null; + if (fs.exists(filePath)) { +FileStatus fileStatus = fs.getFileStatus(filePath); +if (fileStatus.isDirectory()) { + lastModifiedTimesForInputDir = referenceTimes.get(fileStatus.getPath().toString()); +} else { + lastModifiedTimesForInputDir = referenceTimes.get(fileStatus.getPath().getParent().toString()); --- End diff -- @Priyanka: If we maintain 2 different keys, then fileSplitter will emit multiple filemetadata's for /home/myDir/file1.txt. Is it expected behavior. Please correct it, if I am wrong. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2103 Fixed the scan...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/300#discussion_r65023670 --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java --- @@ -375,11 +374,11 @@ public void run() lastScannedInfo = null; numDiscoveredPerIteration = 0; for (String afile : files) { - String filePath = new File(afile).getAbsolutePath(); - LOG.debug("Scan started for input {}", filePath); + Path filePath = new Path(afile); + LOG.debug("Scan started for input {}", filePath.toString()); --- End diff -- Yeah. afile and filePath.toString() represents the same value. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2103 Fixed the scan...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/300#discussion_r65023539 --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java --- @@ -375,11 +374,11 @@ public void run() lastScannedInfo = null; numDiscoveredPerIteration = 0; for (String afile : files) { - String filePath = new File(afile).getAbsolutePath(); - LOG.debug("Scan started for input {}", filePath); + Path filePath = new Path(afile); + LOG.debug("Scan started for input {}", filePath.toString()); Map<String, Long> lastModifiedTimesForInputDir; - lastModifiedTimesForInputDir = referenceTimes.get(filePath); - scan(new Path(afile), null, lastModifiedTimesForInputDir); + lastModifiedTimesForInputDir = referenceTimes.get(fs.getFileStatus(filePath).getPath().toString()); --- End diff -- No. In case of LocalFileSystem, if the filePath consists of relative path then again the keys are different. Setting the directory path in createScannedFileInfo() and accessing the value from referenceTimes must be sync. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2103 Fixed the scan...
GitHub user chaithu14 opened a pull request: https://github.com/apache/incubator-apex-malhar/pull/300 APEXMALHAR-2103 Fixed the scanner issue in FileSplitterInput Class You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2103-ScannerIssue Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-apex-malhar/pull/300.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #300 commit 23e91ea0308721585bbdab007bad7adea4592796 Author: Chaitanya <chaita...@datatorrent.com> Date: 2016-05-27T09:53:05Z APEXMALHAR-2103 Fixed the scanner issue in FileSplitterInput Class --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2103 Fixed the scan...
Github user chaithu14 closed the pull request at: https://github.com/apache/incubator-apex-malhar/pull/299 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2103 Fixed the scan...
GitHub user chaithu14 opened a pull request: https://github.com/apache/incubator-apex-malhar/pull/299 APEXMALHAR-2103 Fixed the scanner issue in FileSplitterInput Class You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2103-ScannerIssue Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-apex-malhar/pull/299.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #299 commit 94d01fd5a264684f6b372600a92c1c7d20b12537 Author: Chaitanya <chaita...@datatorrent.com> Date: 2016-05-27T06:40:48Z APEXMALHAR-2103 Fixed the scanner issue in FileSplitterInput Class --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---