[GitHub] apex-core pull request #519: APEXCORE-700 Add Evolving annotations for class...
GitHub user tushargosavi opened a pull request: https://github.com/apache/apex-core/pull/519 APEXCORE-700 Add Evolving annotations for classes changed through 985⦠â¦6080ed You can merge this pull request into a Git repository by running: $ git pull https://github.com/tushargosavi/apex-core APEXCORE-700 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-core/pull/519.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #519 commit ea0adae7209d9cd9d5586c38c2f5fc7b3e7cf8ab Author: Tushar R. Gosavi <tus...@apache.org> Date: 2017-04-27T07:26:39Z APEXCORE-700 Add Evolving annotations for classes changed through 9856080ed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #517: APEXCORE-709 Refactor code chagnes made through...
GitHub user tushargosavi opened a pull request: https://github.com/apache/apex-core/pull/517 APEXCORE-709 Refactor code chagnes made through APEXCORE-575 You can merge this pull request into a Git repository by running: $ git pull https://github.com/tushargosavi/apex-core APEXCORE-709 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-core/pull/517.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #517 commit 9370e4da61445639e59e336f39b9a41cc7021e3b Author: Tushar R. Gosavi <tus...@apache.org> Date: 2017-04-17T08:03:19Z APEXCORE-709 Refactor code chagnes made through APEXCORE-575 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #514: APEXCORE-702 Mark plugin interfaces as Evolving
GitHub user tushargosavi opened a pull request: https://github.com/apache/apex-core/pull/514 APEXCORE-702 Mark plugin interfaces as Evolving You can merge this pull request into a Git repository by running: $ git pull https://github.com/tushargosavi/apex-core APEXCORE-702 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-core/pull/514.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #514 commit 00e28ab14c9b2dbeed86b9ea3005e9881b34ccb8 Author: Tushar R. Gosavi <tus...@apache.org> Date: 2017-04-14T17:20:52Z APEXCORE-702 Mark plugin interfaces as Evolving --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #410: APEXCORE-408: Ability to schedule Sub-DAG from ...
Github user tushargosavi closed the pull request at: https://github.com/apache/apex-core/pull/410 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #503: APEXCORE-649 Provide snapshot of DAG to plugins...
GitHub user tushargosavi opened a pull request: https://github.com/apache/apex-core/pull/503 APEXCORE-649 Provide snapshot of DAG to plugins instead of actual DAG You can merge this pull request into a Git repository by running: $ git pull https://github.com/tushargosavi/apex-core APEXCORE-649 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-core/pull/503.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #503 commit 98aa84b19f52bf218ba0b44b4f2da05ca26d2de8 Author: Tushar R. Gosavi <tus...@apache.org> Date: 2017-03-27T05:51:16Z APEXCORE-649 Provide snapshot of DAG to plugins instead of actual DAG --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #476: APEXCORE-496 Provide way for StatsListeners to ...
GitHub user tushargosavi opened a pull request: https://github.com/apache/apex-core/pull/476 APEXCORE-496 Provide way for StatsListeners to get additional information You can merge this pull request into a Git repository by running: $ git pull https://github.com/tushargosavi/apex-core APEXCORE-496 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-core/pull/476.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #476 commit 8663e443fc4e32639219a779a8444d9b4bec9e4a Author: Tushar R. Gosavi <tus...@apache.org> Date: 2017-02-21T02:57:02Z APEXCORE-496 Provide way for StatsListeners to get additional information. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #475: Review Only -- APEXCORE-649 Infrastructure for ...
GitHub user tushargosavi opened a pull request: https://github.com/apache/apex-core/pull/475 Review Only -- APEXCORE-649 Infrastructure for user define stram event listeners. Added a generic framework for application wide listeners as discussed in https://lists.apache.org/thread.html/1c37e0954cee029c9de537cc35ecb35beebdf49aba17de89bd2ce9ed@%3Cdev.apex.apache.org%3E Following functionality is added - listening to container heartbeat to extract stats and metrics. - listening to stram events. - committed window id move event. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tushargosavi/apex-core APEXCORE-649 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-core/pull/475.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #475 commit e2bf3e586d53c473c9d31c051720f82e34c5d845 Author: Tushar R. Gosavi <tus...@apache.org> Date: 2017-02-20T14:19:05Z APEXCORE-649 Infrastructure for user define stram event listeners. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #447: APEXCORE-604 extend DAG API to get operators an...
GitHub user tushargosavi reopened a pull request: https://github.com/apache/apex-core/pull/447 APEXCORE-604 extend DAG API to get operators and streams from the DAG. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tushargosavi/apex-core APEXCORE-604 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-core/pull/447.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #447 commit 962bc95d351c22fb8922a6d8c30678f43e2fc98f Author: Tushar R. Gosavi <tus...@apache.org> Date: 2017-01-17T06:41:12Z APEXCORE-604 extend DAG API to get operators and streams from the DAG. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #447: APEXCORE-604 extend DAG API to get operators an...
Github user tushargosavi closed the pull request at: https://github.com/apache/apex-core/pull/447 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #457: APEXCORE-625 log exception from eventloop.stop,...
GitHub user tushargosavi opened a pull request: https://github.com/apache/apex-core/pull/457 APEXCORE-625 log exception from eventloop.stop, and let original exception to propagate to caller. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tushargosavi/apex-core APEXCORE-625 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-core/pull/457.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #457 commit c82e222aacb04d97aed7e4fc1d1ef455e904dd27 Author: Tushar R. Gosavi <tus...@apache.org> Date: 2017-01-27T08:23:14Z APEXCORE-625 log exception from eventloop.stop, and let original exception to propagate to caller. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #456: APEXCORE-577 Add visitor API to examine the DAG...
GitHub user tushargosavi opened a pull request: https://github.com/apache/apex-core/pull/456 APEXCORE-577 Add visitor API to examine the DAG. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tushargosavi/apex-core APEXCORE-577 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-core/pull/456.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #456 commit 0da3a472c0bea14b49239e8e267042fdc513056e Author: Tushar R. Gosavi <tus...@apache.org> Date: 2016-12-22T09:06:06Z APEXCORE-577 Add visitor API to examine the DAG. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #447: APEXCORE-604 extend DAG API to get operators an...
GitHub user tushargosavi opened a pull request: https://github.com/apache/apex-core/pull/447 APEXCORE-604 extend DAG API to get operators and streams from the DAG. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tushargosavi/apex-core APEXCORE-604 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-core/pull/447.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #447 commit a339472d4f8186e191c62882441fd37aa2dd0602 Author: Tushar R. Gosavi <tus...@apache.org> Date: 2017-01-17T06:41:12Z APEXCORE-604 extend DAG API to get operators and streams from the DAG. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #424: APEXCORE-294 controlled shutdown of an applicat...
GitHub user tushargosavi reopened a pull request: https://github.com/apache/apex-core/pull/424 APEXCORE-294 controlled shutdown of an application. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tushargosavi/apex-core APEXCORE-294 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-core/pull/424.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #424 commit 64fdedefd30fd7dfbe755afae4ecde1ad667c91b Author: Tushar R. Gosavi <tus...@apache.org> Date: 2017-01-03T07:16:33Z APEXCORE-294 controlled shutdown of an application. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #446: APEXCORE-610 Avoid multiple calls to getBytes.
GitHub user tushargosavi opened a pull request: https://github.com/apache/apex-core/pull/446 APEXCORE-610 Avoid multiple calls to getBytes. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tushargosavi/apex-core APEXCORE-610 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-core/pull/446.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #446 commit 44bc4b7a58298e228e749b58288e30187736ae86 Author: Tushar R. Gosavi <tus...@apache.org> Date: 2017-01-17T08:38:13Z APEXCORE-610 Avoid multiple calls to getBytes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #439: APEXCORE-595: Don't update committedWindowId wh...
GitHub user tushargosavi opened a pull request: https://github.com/apache/apex-core/pull/439 APEXCORE-595: Don't update committedWindowId when all partitions are removed. @tweise please review. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tushargosavi/apex-core APEXCORE-595 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-core/pull/439.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #439 commit b5c8e4bee839018867738f11ffe18ffa26b695d7 Author: Tushar R. Gosavi <tus...@apache.org> Date: 2017-01-03T10:08:45Z APEXCORE-595: Don't update committedWindowId when all partitions are removed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #387: APEXCORE-525 fail validation if statefull strea...
Github user tushargosavi closed the pull request at: https://github.com/apache/apex-core/pull/387 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #424: APEXCORE-294 controlled shutdown of an applicat...
GitHub user tushargosavi opened a pull request: https://github.com/apache/apex-core/pull/424 APEXCORE-294 controlled shutdown of an application. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tushargosavi/apex-core APEXCORE-294 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-core/pull/424.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #424 commit 82679e0f65a97d7686811539e9506b529e26ee42 Author: Tushar R. Gosavi <tus...@apache.org> Date: 2016-11-25T05:31:23Z APEXCORE-294 controlled shutdown of an application. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #422: APEXCORE-575 Improve application restart time.
GitHub user tushargosavi opened a pull request: https://github.com/apache/apex-core/pull/422 APEXCORE-575 Improve application restart time. I have implemented a new storage agent (CascadeStorageAgent) which maintains two storage agents one for old checkpoint directory and one for new checkpoint directory, using this storage agent we could direct read on old directory during initial start, and write to new checkpoint directory. With this we could avoid copy of checkpoints from old directory to new directory. with 2 GB state application restart was brought down to few seconds from 2 minutes. Other changes are - Add log message to print time taken to copy the initial state. - Do not copy stats and events directory as they are overwritten anyway in the new application. - Use new storage agent to avoid copying checkpoints directory. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tushargosavi/apex-core restart_optimizations Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-core/pull/422.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #422 commit edc02612efbb786243e6a0188d40fe1e08ac941b Author: Tushar R. Gosavi <tus...@apache.org> Date: 2016-11-18T18:06:15Z APEXCORE-575 Improve application restart time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #410: APEXCORE-408: Ability to schedule Sub-DAG from ...
GitHub user tushargosavi opened a pull request: https://github.com/apache/apex-core/pull/410 APEXCORE-408: Ability to schedule Sub-DAG from running application. Pull request for dynamic dag modification through stats listener. It provides following functionality - StatsListener can access the opearator name for easily detecting which opearator stats are being processed. - StatsListener can create a instance of object through which it can submit dag modifications to the engine. - StatsListener can return dag changes as a response to engine. - PlanModifier is modified to take a DAG and apply it on the existing running DAG and deploy the changes. The following functionality is not working yet. - The new opearator does not start from the correct windowId (https://issues.apache.org/jira/browse/APEXCORE-532) - Relanched application failed to start when it was killed after dynamic dag modification. - There is no support for resuming operator from previous state when they were removed. This could be achived through readig state through external storage on setup. - persist operator support is not present for newly added streams. - Not all parts are covered through tests. The demo application using the feature is available at https://github.com/tushargosavi/apex-dynamic-scheduling There are two variations of WordCount application. The first variation detects the presence of new files start a disconnected DAG to process the data. (https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/WordCountApp.java) The second application (https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/ExtendApp.java), initially only one reader operator is running in the DAG, and provides pendingFiles as auto-metric to stat listener. On detecting pending files it attaches splitter counter and output operator to the read operator. Once files are processed the splitter, counter and output operators are removed and added back again if new data files are added into the directory. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tushargosavi/incubator-apex-core APEXCORE-408 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-core/pull/410.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #410 commit 5cecd029463de021469afb20650e3afbf75fd088 Author: Tushar R. Gosavi <tus...@apache.org> Date: 2016-08-08T10:24:41Z APEXCORE-408: Ability to schedule Sub-DAG from running application. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #393: APEXCORE-408 Ability to schedule Sub-DAG from r...
Github user tushargosavi closed the pull request at: https://github.com/apache/apex-core/pull/393 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #361: APEXCORE-496 make operator name available to St...
Github user tushargosavi closed the pull request at: https://github.com/apache/apex-core/pull/361 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #402: APEXCORE-532: Fix issue where new operators add...
GitHub user tushargosavi opened a pull request: https://github.com/apache/apex-core/pull/402 APEXCORE-532: Fix issue where new operators added to dag starts from initial checkpoint In case new operators are added dynamically, they should start from windowId of upstream operator. The issue was that new operators gets added using addLogicalOpeartor which sets recoveryWindowId to INITIAL_CHECKPOINT, which prevents getActivationCheckpoint to return correct window id of the operator during deploy. @PramodSSImmaneni please review. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tushargosavi/incubator-apex-core APEXCORE-532 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-core/pull/402.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #402 commit 30091217dced6d1581a8630374c942a811ab6ed2 Author: Tushar R. Gosavi <tus...@apache.org> Date: 2016-10-03T07:07:59Z APEXCORE-532: Fix issue where new operators added to dag starts from initial checkpoint --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #393: **Review Only** Apexcore 408
GitHub user tushargosavi opened a pull request: https://github.com/apache/apex-core/pull/393 **Review Only** Apexcore 408 Pull request for dynamic dag modification through stats listener. It provides following functionality - StatsListener can access the opearator name for easily detecting which opearator stats are being processed. - StatsListener can create a instance of object through which it can submit dag modifications to the engine. - StatsListener can return dag changes as a response to engine. - PlanModifier is modified to take a DAG and apply it on the existing running DAG and deploy the changes. The following functionality is not working yet. - The new opearator does not start from the correct windowId (https://issues.apache.org/jira/browse/APEXCORE-532) - Relanched application failed to start when it was killed after dynamic dag modification. - There is no support for resuming operator from previous state when they were removed. This could be achived through readig state through external storage on setup. - persist operator support is not present for newly added streams. - Not all parts are covered through tests. The demo application using the feature is available at https://github.com/tushargosavi/apex-dynamic-scheduling There are two variations of WordCount application. The first variation detects the presence of new files start a disconnected DAG to process the data. (https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/WordCountApp.java) The second application (https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/ExtendApp.java), initially only one reader operator is running in the DAG, and provides pendingFiles as auto-metric to stat listener. On detecting pending files it attaches splitter counter and output operator to the read operator. Once files are processed the splitter, counter and output operators are removed and added back again if new data files are added into the directory. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tushargosavi/incubator-apex-core APEXCORE-408 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-core/pull/393.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #393 commit 05fcca2c919e2828f8348e5e4ed60d87e044b5ac Author: Tushar R. Gosavi <tus...@apache.org> Date: 2016-08-08T10:24:41Z APEXCORE-496: Provide way to access dag elements for statslisteners. commit 8cf49f28a7732787c2517a5b345e4690be517280 Author: Tushar R. Gosavi <tus...@apache.org> Date: 2016-07-05T08:40:26Z APEXCORE-408: Dynamic dag changes in Apex through StatsListeners --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #387: APEXCORE-525 fail validation if statefull strea...
GitHub user tushargosavi opened a pull request: https://github.com/apache/apex-core/pull/387 APEXCORE-525 fail validation if statefull streamcodec does not implement newInstance method. If user implements a streamcodec extending DefaultStatefulStreamCodec they need to also remember to implement newInstance method as platform use object returned by newInstance as codec. If use forget to override newInstance then platform keep on using default codec without any warning to user and such error are caught very late mostly resulting incorrect data distribution and incorrect result at final operator. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tushargosavi/incubator-apex-core APEXCORE-525 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-core/pull/387.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #387 commit 8ab11fae86d608d5fb4a3e06153cee056b4cb5d5 Author: Tushar R. Gosavi <tus...@apache.org> Date: 2016-09-12T06:43:06Z APEXCORE-525: fail validation if statefull streamcodec does not implement newInstance method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #361: APEXCORE-496 make operator name available to St...
Github user tushargosavi commented on a diff in the pull request: https://github.com/apache/apex-core/pull/361#discussion_r74229007 --- Diff: api/src/main/java/com/datatorrent/api/StatsListener.java --- @@ -115,6 +115,28 @@ List getOperatorResponse(); } + /** + * An interface to the DAG. Stats listener can get information about + * operator or other elements in the DAG through this interface. currerntly + * we only provide method to extract the operator name based on the physical + * id of the operator. In future more methods can be added. + * + */ --- End diff -- exposing all information may expose internal implementation of the DAG. we can only have dependency on apex-api. Can you suggest which information can be provided to stat listener without adding dependency on the apex-engine. I can think of following - name - number of partitions for self. - aggregated stats at logical operator level. This list could be expanded in future. This jira puts the required infrastructure without breaking backward compatibility. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core issue #361: APEXCORE-496 make operator name available to StatsList...
Github user tushargosavi commented on the issue: https://github.com/apache/apex-core/pull/361 @vrozov @sandeshh I have updated the pull request please review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #351: APEXMALHAR-2169 Fixed the issue of dynamic pa...
Github user tushargosavi commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/351#discussion_r73647526 --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java --- @@ -188,6 +188,8 @@ @Min(1) private int initialPartitionCount = 1; + private boolean isPartitionBasedOnLoad = false; --- End diff -- Add a comment about this flag. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #361: APEXCORE-496 make operator name available to St...
Github user tushargosavi commented on a diff in the pull request: https://github.com/apache/apex-core/pull/361#discussion_r73645793 --- Diff: api/src/main/java/com/datatorrent/api/StatsListener.java --- @@ -113,6 +113,8 @@ long getLatencyMA(); List getOperatorResponse(); + +String getOperatorName(); --- End diff -- stat listener only have access to the BatchedOperatorStats, hence added it as part of it. I am closing this pull request now. Making changes as per discussion on the dev thread. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #282: APEXMALHAR-2066 JdbcPolling,idempotent,partit...
Github user tushargosavi commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/282#discussion_r70569998 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -0,0 +1,656 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import javax.validation.constraints.Min; + +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.Operator.IdleTimeHandler; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.lib.db.AbstractStoreInputOperator; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Abstract operator for for consuming data using JDBC interface + * User needs User needs to provide + * tableName,dbConnection,setEmitColumnList,look-up key + * Optionally batchSize,pollInterval,Look-up key and a where clause can be given + * + * This operator uses static partitioning to arrive at range queries for exactly + * once reads + * Assumption is that there is an ordered column using which range queries can + * be formed + * If an emitColumnList is provided, please ensure that the keyColumn is the + * first column in the list + * Range queries are formed using the {@link JdbcMetaDataUtility}} Output - + * comma separated list of the emit columns eg columnA,columnB,columnC + * + * In the next iterations this operator would support an in-clause for + * idempotency instead of having only range query support to support non ordered + * key columns + * + * @displayName Jdbc Polling Input Operator + * @category Input + * @tags database, sql, jdbc, partitionable,exactlyOnce + */ +@Evolving +public abstract class AbstractJdbcPollInputOperator extends AbstractStoreInputOperator<T, JdbcStore> +implements ActivationListener, IdleTimeHandler, Partitioner<AbstractJdbcPollInputOperator> +{ + /** + * poll interval in milliseconds + */ + private static int pollInterval = 1; + + @Min(1) + private int partitionCount = 1; + protected transient int operatorId; + protected transient boolean isReplayed; + protected transient boolean isPollable; + protected int batchSize; + protected static int fetchSize = 2; + /** + * Map of windowId to of the range key + */ + protected transient MutablePair<String, String> currentWindowRecoveryState; + + /** + * size of the emit queue used to hold polled records before emit + */ + private static int queueCapacity = 4 * 1024 * 1024; + private transient volatile boolean execute; + private transient AtomicReference cause; + protected transient int spinMillis; + private transient OperatorContext context; + protected String tableName; + protected String key; + protected long currentWindowId; + protected KeyValPair<String, String> r
[GitHub] apex-malhar pull request #330: Initial cut of Inner Join operator for REVIEW...
Github user tushargosavi commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r70043528 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java --- @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl; +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.hadoop.fs.Path; + +import com.google.common.collect.Maps; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.join.managed.ManagedSpillableComplexComponent; +import com.datatorrent.lib.join.managed.ManagedTimeStateMultiMap; +import com.datatorrent.netlet.util.Slice; + +public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends AbstractInnerJoinOperator<K,T> implements +Operator.CheckpointNotificationListener, Operator.CheckpointListener,Operator.IdleTimeHandler +{ + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class); + public static final String stateDir = "managedState"; + public static final String stream1State = "stream1Data"; + public static final String stream2State = "stream2Data"; + private transient long sleepMillis; + private transient Map<JoinEvent<K,T>, Future> waitingEvents = Maps.newLinkedHashMap(); + private int noOfBuckets = 1; + private Long bucketSpanTime; + protected ManagedTimeStateImpl stream1Store; + protected ManagedTimeStateImpl stream2Store; + + @Override + public void createStores() + { +stream1Store = new ManagedTimeStateImpl(); +stream2Store = new ManagedTimeStateImpl(); +stream1Store.setNumBuckets(noOfBuckets); +stream2Store.setNumBuckets(noOfBuckets); +if (bucketSpanTime != null) { + stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime)); + stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime)); +} + +if (getExpiryTime() != null) { + stream1Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime())); + stream2Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime())); +} + +component = new ManagedSpillableComplexComponent(); +stream1Data = ((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream1Store, isStream1KeyPrimary()); +stream2Data = ((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream2Store, isStream2KeyPrimary()); + } + + @Override + protected void processTuple(T tuple, boolean isStream1Data) + { +Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data; +K key = extractKey(tuple,isStream1Data); +long timeBucket = extractTime(tuple,isStream1Data); +((ManagedTimeStateMultiMap)store).put(key, tuple,timeBucket); +joinStream(key,tuple,isStream1Data); + } + + @Override + protected void joinStream(K key, T tuple, boolean isStream1Data) + { +Spillable.SpillableByteArrayListMultimap<K, T> store = isStream1Data ? stream2Data : stream1Data; +Future future = ((ManagedTimeStateMultiMap)store).getAsync(ke
[GitHub] apex-malhar pull request #330: Initial cut of Inner Join operator for REVIEW...
Github user tushargosavi commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r70043086 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java --- @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Arrays; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent; + +import com.google.common.base.Preconditions; + +import com.datatorrent.api.Context; +import com.datatorrent.common.util.BaseOperator; + +public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator +{ + protected transient String[][] includeFields; + protected transient List keyFields; + protected transient List timeFields; + + private Long expiryTime; + protected SpillableComplexComponent component; + protected Spillable.SpillableByteArrayListMultimap<K,T> stream1Data; + protected Spillable.SpillableByteArrayListMultimap<K,T> stream2Data; + private boolean isStream1KeyPrimary = false; + private boolean isStream2KeyPrimary = false; + + @NotNull + private String keyFieldsStr; + @NotNull + private String includeFieldStr; + private String timeFieldsStr; + + protected void processTuple(T tuple, boolean isStream1Data) + { +Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data; +K key = extractKey(tuple,isStream1Data); +store.put(key, tuple); +joinStream(key,tuple,isStream1Data); + } + + protected void joinStream(K key, T tuple, boolean isStream1Data) + { +Spillable.SpillableByteArrayListMultimap<K, T> store = isStream1Data ? stream2Data : stream1Data; +List value = store.get(key); + +// Join the input tuple with the joined tuples +if (value != null) { + for (T joinedValue : value) { +T result = isStream1Data ? joinTuples(Arrays.asList(tuple, joinedValue)) : --- End diff -- why list as argument to joinTuples? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: Initial cut of Inner Join operator for REVIEW...
Github user tushargosavi commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r70042536 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java --- @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Arrays; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent; + +import com.google.common.base.Preconditions; + +import com.datatorrent.api.Context; +import com.datatorrent.common.util.BaseOperator; + +public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator +{ + protected transient String[][] includeFields; + protected transient List keyFields; + protected transient List timeFields; + + private Long expiryTime; + protected SpillableComplexComponent component; + protected Spillable.SpillableByteArrayListMultimap<K,T> stream1Data; + protected Spillable.SpillableByteArrayListMultimap<K,T> stream2Data; + private boolean isStream1KeyPrimary = false; + private boolean isStream2KeyPrimary = false; + + @NotNull + private String keyFieldsStr; + @NotNull + private String includeFieldStr; + private String timeFieldsStr; + + protected void processTuple(T tuple, boolean isStream1Data) --- End diff -- Instead of boolean , can you pass the store directly? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: Initial cut of Inner Join operator for REVIEW...
Github user tushargosavi commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r70042432 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java --- @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl; +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.hadoop.fs.Path; + +import com.google.common.collect.Maps; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.join.managed.ManagedSpillableComplexComponent; +import com.datatorrent.lib.join.managed.ManagedTimeStateMultiMap; +import com.datatorrent.netlet.util.Slice; + +public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends AbstractInnerJoinOperator<K,T> implements +Operator.CheckpointNotificationListener, Operator.CheckpointListener,Operator.IdleTimeHandler +{ + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class); + public static final String stateDir = "managedState"; + public static final String stream1State = "stream1Data"; + public static final String stream2State = "stream2Data"; + private transient long sleepMillis; + private transient Map<JoinEvent<K,T>, Future> waitingEvents = Maps.newLinkedHashMap(); + private int noOfBuckets = 1; + private Long bucketSpanTime; + protected ManagedTimeStateImpl stream1Store; + protected ManagedTimeStateImpl stream2Store; + + @Override + public void createStores() + { +stream1Store = new ManagedTimeStateImpl(); +stream2Store = new ManagedTimeStateImpl(); +stream1Store.setNumBuckets(noOfBuckets); +stream2Store.setNumBuckets(noOfBuckets); +if (bucketSpanTime != null) { + stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime)); + stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime)); +} + +if (getExpiryTime() != null) { --- End diff -- can it be different for each stores? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #351: APEXCORE-405 Common API to launch on local mode...
Github user tushargosavi commented on a diff in the pull request: https://github.com/apache/apex-core/pull/351#discussion_r69280615 --- Diff: engine/src/main/java/com/datatorrent/stram/LocalModeImpl.java --- @@ -54,6 +55,53 @@ public DAG cloneDAG() throws Exception } @Override + public LocalAppHandle launchApp(StreamingApplication application, Configuration configuration, Attribute.AttributeMap + launchAttributes) throws LaunchException + { +try { + //application.populateDAG(getDAG(), configuration == null ? new Configuration(false) : configuration); --- End diff -- remove commented code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #318: APEXMALHAR-2119 add setters for partition cou...
Github user tushargosavi commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/318#discussion_r67813977 --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java --- @@ -1110,6 +1116,16 @@ public String toString() return "DirectoryScanner [filePatternRegexp=" + filePatternRegexp + " partitionIndex=" + partitionIndex + " partitionCount=" + partitionCount + "]"; } + +protected void setPartitionIndex(int partitionIndex) +{ + this.partitionIndex = partitionIndex; +} + +protected void setPartitionCount(int partitionCount) +{ + this.partitionCount = partitionCount; +} --- End diff -- Added test for setPartition and verified that the base function usage the value set. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #318: APEXMALHAR-2119 add setters for partition cou...
Github user tushargosavi commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/318#discussion_r67808791 --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java --- @@ -1110,6 +1116,16 @@ public String toString() return "DirectoryScanner [filePatternRegexp=" + filePatternRegexp + " partitionIndex=" + partitionIndex + " partitionCount=" + partitionCount + "]"; } + +protected void setPartitionIndex(int partitionIndex) +{ + this.partitionIndex = partitionIndex; +} + +protected void setPartitionCount(int partitionCount) +{ + this.partitionCount = partitionCount; +} --- End diff -- With the kryo.clone change in createPartition, it is getting set on the derived object from there. Initially createPartition was creating object of type DirectoryScanner, so derived class had no option to override it and create object of own type, and then set the count and index value, but it could not as those were private without setters in base class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #319: REVIEW ONLY: Operator supporting the Beam con...
Github user tushargosavi commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/319#discussion_r67679805 --- Diff: stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java --- @@ -53,4 +60,121 @@ */ public interface WindowedStream extends ApexStream { + + /** + * Count of all tuples + * @return new stream of Integer + */ + > STREAM count(); + + /** + * Count tuples by the key + * If the input is KeyedTuple it will get the key from getKey method from the tuple + * If not, use the tuple itself as a key + * @return new stream of Map + */ + >> STREAM countByKey(); + + /** + * + * Count tuples by the indexed key + * @param key the index of the field in the tuple that are used as key + * @return new stream of Map + */ + >> STREAM countByKey(int key); + + + /** + * + * Return top tuples by the selected key + * @return new stream of Key and top N tuple of the key + */ + <TUPLE, KEY, STREAM extends ApexStream<Map.Entry<KEY, List>>> STREAM topByKey(int N); + + /** + * + * Return top tuples of all tuples in the window + * @return new stream of Map + */ + > STREAM top(int N); + + <O, STREAM extends ApexStream> STREAM combineByKey(); + + <O, STREAM extends ApexStream> STREAM combine(); + + /** + * Reduce transformation + * Add an operator to the DAG which merge tuple t1, t2 to new tuple + * @param name operator name + * @param reduce reduce function + * @return new stream of same type + */ + > STREAM reduce(String name, Function.ReduceFunction reduce); + + /** + * Fold transformation + * Add an operator to the DAG which merge tuple T to accumulated result tuple O + * @param initialValue initial result value + * @param fold fold function + * @param Result type + * @return new stream of type O + */ + <O, STREAM extends ApexStream> STREAM fold(O initialValue, Function.FoldFunction<T, O> fold); + + /** + * Fold transformation + * Add an operator to the DAG which merge tuple T to accumulated result tuple O + * @param name name of the operator + * @param initialValue initial result value + * @param fold fold function + * @param Result type + * @return new stream of type O + */ + <O, STREAM extends ApexStream> STREAM fold(String name, O initialValue, Function.FoldFunction<T, O> fold); + + + /** + * Fold transformation + * Add an operator to the DAG which merge tuple T to accumulated result tuple O + * @param name name of the operator + * @param fold fold function + * @param Result type + * @return new stream of type O + */ + <O, K, STREAM extends ApexStream<KeyValPair<K, O>>> STREAM foldByKey(String name, Function.FoldFunction<T, KeyValPair<K, O>> fold); + + /** + * Fold transformation + * Add an operator to the DAG which merge tuple T to accumulated result tuple O + * @param fold fold function + * @param Result type + * @return new stream of type O + */ + <O, K, STREAM extends ApexStream<KeyValPair<K, O>>> STREAM foldByKey(Function.FoldFunction<T, KeyValPair<K, O>> fold); + + + /** + * Reduce transformation + * Add an operator to the DAG which merge tuple t1, t2 to new tuple + * @param reduce reduce function + * @return new stream of same type + */ + > STREAM reduce(Function.ReduceFunction reduce); --- End diff -- reduceByKey ?? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #319: REVIEW ONLY: Operator supporting the Beam con...
Github user tushargosavi commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/319#discussion_r67652351 --- Diff: stream/src/main/java/org/apache/apex/malhar/stream/window/WindowedStorage.java --- @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.window; + +import com.datatorrent.api.StreamCodec; +import org.apache.hadoop.classification.InterfaceStability; + +import java.util.Map; +import java.util.Set; + +/** + * This interface is for a key/value store for storing data for windowed streams. + * The key to this key/value store is a pair of (Window, K) + * + * Note that this interface does not take recovery into consideration. + * + */ +@InterfaceStability.Evolving +public interface WindowedStorage<K, V> +{ + /** + * Sets the codec (serializer and deserializer) for W. This will be used when storing and retrieving the window to and + * from the storage. In most cases, the starting epoch millis should be used for TimedWindow and the starting tuple + * offset should be used for CountWindow + * + * @param codec + */ + void setWindowCodec(StreamCodec codec); + + /** + * Sets the codec (serializer and deserializer) for K. This will be used when storing and retrieving the key to and + * from the storage + * + * @param codec + */ + void setKeyCodec(StreamCodec codec); + + /** + * Sets the codec (serializer and deserializer) for V. This will be used when storing and retrieving the value to and + * from the storage + * + * @param codec + */ + void setValueCodec(StreamCodec codec); + + /** + * Sets the data associated with the given window and the key + * + * @param window + * @param key + * @param value + */ + void put(Window window, K key, V value); + + /** + * Gets the key/value pairs associated with the given window + * + * @param window + * @return + */ + Set<Map.Entry<K, V>> entrySet(Window window); --- End diff -- Can it be Iterator<Map.Entry<K,V>> In case state is huge and stored in fs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #318: APEXMALHAR-2119 add setters for partition cou...
GitHub user tushargosavi opened a pull request: https://github.com/apache/apex-malhar/pull/318 APEXMALHAR-2119 add setters for partition count and index. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tushargosavi/incubator-apex-malhar APEXMALHAR-2119 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/318.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #318 commit 82cfbbbcca80c4f6eb4d76cb35a6fb5402db0d58 Author: Tushar R. Gosavi <tus...@apache.org> Date: 2016-06-15T11:42:16Z APEXMALHAR-2119 add setters for partition count and index. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-apex-malhar pull request: APEXMALHAR-998 extend second type parame...
GitHub user tushargosavi opened a pull request: https://github.com/apache/incubator-apex-malhar/pull/305 APEXMALHAR-998 extend second type parameter from Object You can merge this pull request into a Git repository by running: $ git pull https://github.com/tushargosavi/incubator-apex-malhar APEXMALHAR-998 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-apex-malhar/pull/305.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #305 commit e809bee20b0761e32bf4b1213e0dafa2791178de Author: Tushar R. Gosavi <tus...@apache.org> Date: 2016-06-01T09:32:27Z APEXMALHAR-998 extend second type parameter from Object --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-apex-core pull request: *Review only* Prototype of runni...
Github user tushargosavi commented on a diff in the pull request: https://github.com/apache/incubator-apex-core/pull/340#discussion_r64374984 --- Diff: engine/src/main/java/com/datatorrent/stram/StramUtils.java --- @@ -133,4 +135,37 @@ public static JSONObject getStackTrace() return jsonObject; } + public static JSONObject getStackTrace2() throws JSONException + { + +String result = "hello"; +try { + + String processName = java.lang.management.ManagementFactory.getRuntimeMXBean().getName(); + Long processId = Long.parseLong(processName.split("@")[0]); + + String cmd = "jstack " + processId; --- End diff -- jstack needs to in the path, else above command might fail. also need to make sure that these tool works with different implementation of jvm. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---