[GitHub] apex-core pull request #519: APEXCORE-700 Add Evolving annotations for class...

2017-04-27 Thread tushargosavi
GitHub user tushargosavi opened a pull request:

https://github.com/apache/apex-core/pull/519

APEXCORE-700 Add Evolving annotations for classes changed through 985…

…6080ed

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tushargosavi/apex-core APEXCORE-700

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/519.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #519


commit ea0adae7209d9cd9d5586c38c2f5fc7b3e7cf8ab
Author: Tushar R. Gosavi <tus...@apache.org>
Date:   2017-04-27T07:26:39Z

APEXCORE-700 Add Evolving annotations for classes changed through 9856080ed




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #517: APEXCORE-709 Refactor code chagnes made through...

2017-04-17 Thread tushargosavi
GitHub user tushargosavi opened a pull request:

https://github.com/apache/apex-core/pull/517

APEXCORE-709 Refactor code chagnes made through APEXCORE-575



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tushargosavi/apex-core APEXCORE-709

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/517.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #517


commit 9370e4da61445639e59e336f39b9a41cc7021e3b
Author: Tushar R. Gosavi <tus...@apache.org>
Date:   2017-04-17T08:03:19Z

APEXCORE-709 Refactor code chagnes made through APEXCORE-575




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #514: APEXCORE-702 Mark plugin interfaces as Evolving

2017-04-14 Thread tushargosavi
GitHub user tushargosavi opened a pull request:

https://github.com/apache/apex-core/pull/514

APEXCORE-702 Mark plugin interfaces as Evolving



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tushargosavi/apex-core APEXCORE-702

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/514.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #514


commit 00e28ab14c9b2dbeed86b9ea3005e9881b34ccb8
Author: Tushar R. Gosavi <tus...@apache.org>
Date:   2017-04-14T17:20:52Z

APEXCORE-702 Mark plugin interfaces as Evolving




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #410: APEXCORE-408: Ability to schedule Sub-DAG from ...

2017-04-07 Thread tushargosavi
Github user tushargosavi closed the pull request at:

https://github.com/apache/apex-core/pull/410


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #503: APEXCORE-649 Provide snapshot of DAG to plugins...

2017-04-04 Thread tushargosavi
GitHub user tushargosavi opened a pull request:

https://github.com/apache/apex-core/pull/503

APEXCORE-649 Provide snapshot of DAG to plugins instead of actual DAG



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tushargosavi/apex-core APEXCORE-649

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/503.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #503


commit 98aa84b19f52bf218ba0b44b4f2da05ca26d2de8
Author: Tushar R. Gosavi <tus...@apache.org>
Date:   2017-03-27T05:51:16Z

APEXCORE-649 Provide snapshot of DAG to plugins instead of actual DAG




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #476: APEXCORE-496 Provide way for StatsListeners to ...

2017-02-20 Thread tushargosavi
GitHub user tushargosavi opened a pull request:

https://github.com/apache/apex-core/pull/476

APEXCORE-496 Provide way for StatsListeners to get additional information



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tushargosavi/apex-core APEXCORE-496

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/476.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #476


commit 8663e443fc4e32639219a779a8444d9b4bec9e4a
Author: Tushar R. Gosavi <tus...@apache.org>
Date:   2017-02-21T02:57:02Z

APEXCORE-496 Provide way for StatsListeners to get additional information.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #475: Review Only -- APEXCORE-649 Infrastructure for ...

2017-02-20 Thread tushargosavi
GitHub user tushargosavi opened a pull request:

https://github.com/apache/apex-core/pull/475

Review Only -- APEXCORE-649 Infrastructure for user define stram event 
listeners.

Added a generic framework for application wide listeners as discussed in 

https://lists.apache.org/thread.html/1c37e0954cee029c9de537cc35ecb35beebdf49aba17de89bd2ce9ed@%3Cdev.apex.apache.org%3E

Following functionality is added
- listening to container heartbeat to extract stats and metrics.
- listening to stram events.
- committed window id move event.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tushargosavi/apex-core APEXCORE-649

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/475.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #475


commit e2bf3e586d53c473c9d31c051720f82e34c5d845
Author: Tushar R. Gosavi <tus...@apache.org>
Date:   2017-02-20T14:19:05Z

APEXCORE-649 Infrastructure for user define stram event listeners.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #447: APEXCORE-604 extend DAG API to get operators an...

2017-02-01 Thread tushargosavi
GitHub user tushargosavi reopened a pull request:

https://github.com/apache/apex-core/pull/447

APEXCORE-604 extend DAG API to get operators and streams from the DAG.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tushargosavi/apex-core APEXCORE-604

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/447.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #447


commit 962bc95d351c22fb8922a6d8c30678f43e2fc98f
Author: Tushar R. Gosavi <tus...@apache.org>
Date:   2017-01-17T06:41:12Z

APEXCORE-604 extend DAG API to get operators and streams from the DAG.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #447: APEXCORE-604 extend DAG API to get operators an...

2017-02-01 Thread tushargosavi
Github user tushargosavi closed the pull request at:

https://github.com/apache/apex-core/pull/447


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #457: APEXCORE-625 log exception from eventloop.stop,...

2017-01-27 Thread tushargosavi
GitHub user tushargosavi opened a pull request:

https://github.com/apache/apex-core/pull/457

APEXCORE-625 log exception from eventloop.stop, and let original exception 
to propagate to caller.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tushargosavi/apex-core APEXCORE-625

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/457.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #457


commit c82e222aacb04d97aed7e4fc1d1ef455e904dd27
Author: Tushar R. Gosavi <tus...@apache.org>
Date:   2017-01-27T08:23:14Z

APEXCORE-625 log exception from eventloop.stop, and let original
exception to propagate to caller.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #456: APEXCORE-577 Add visitor API to examine the DAG...

2017-01-26 Thread tushargosavi
GitHub user tushargosavi opened a pull request:

https://github.com/apache/apex-core/pull/456

APEXCORE-577 Add visitor API to examine the DAG.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tushargosavi/apex-core APEXCORE-577

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/456.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #456


commit 0da3a472c0bea14b49239e8e267042fdc513056e
Author: Tushar R. Gosavi <tus...@apache.org>
Date:   2016-12-22T09:06:06Z

APEXCORE-577 Add visitor API to examine the DAG.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #447: APEXCORE-604 extend DAG API to get operators an...

2017-01-18 Thread tushargosavi
GitHub user tushargosavi opened a pull request:

https://github.com/apache/apex-core/pull/447

APEXCORE-604 extend DAG API to get operators and streams from the DAG.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tushargosavi/apex-core APEXCORE-604

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/447.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #447


commit a339472d4f8186e191c62882441fd37aa2dd0602
Author: Tushar R. Gosavi <tus...@apache.org>
Date:   2017-01-17T06:41:12Z

APEXCORE-604 extend DAG API to get operators and streams from the DAG.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #424: APEXCORE-294 controlled shutdown of an applicat...

2017-01-18 Thread tushargosavi
GitHub user tushargosavi reopened a pull request:

https://github.com/apache/apex-core/pull/424

APEXCORE-294 controlled shutdown of an application.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tushargosavi/apex-core APEXCORE-294

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/424.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #424


commit 64fdedefd30fd7dfbe755afae4ecde1ad667c91b
Author: Tushar R. Gosavi <tus...@apache.org>
Date:   2017-01-03T07:16:33Z

APEXCORE-294 controlled shutdown of an application.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #446: APEXCORE-610 Avoid multiple calls to getBytes.

2017-01-17 Thread tushargosavi
GitHub user tushargosavi opened a pull request:

https://github.com/apache/apex-core/pull/446

APEXCORE-610 Avoid multiple calls to getBytes.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tushargosavi/apex-core APEXCORE-610

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/446.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #446


commit 44bc4b7a58298e228e749b58288e30187736ae86
Author: Tushar R. Gosavi <tus...@apache.org>
Date:   2017-01-17T08:38:13Z

APEXCORE-610 Avoid multiple calls to getBytes.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #439: APEXCORE-595: Don't update committedWindowId wh...

2017-01-03 Thread tushargosavi
GitHub user tushargosavi opened a pull request:

https://github.com/apache/apex-core/pull/439

APEXCORE-595: Don't update committedWindowId when all partitions are 
removed.

@tweise please review.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tushargosavi/apex-core APEXCORE-595

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/439.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #439


commit b5c8e4bee839018867738f11ffe18ffa26b695d7
Author: Tushar R. Gosavi <tus...@apache.org>
Date:   2017-01-03T10:08:45Z

APEXCORE-595: Don't update committedWindowId when all partitions are
removed.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #387: APEXCORE-525 fail validation if statefull strea...

2016-11-29 Thread tushargosavi
Github user tushargosavi closed the pull request at:

https://github.com/apache/apex-core/pull/387


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #424: APEXCORE-294 controlled shutdown of an applicat...

2016-11-25 Thread tushargosavi
GitHub user tushargosavi opened a pull request:

https://github.com/apache/apex-core/pull/424

APEXCORE-294 controlled shutdown of an application.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tushargosavi/apex-core APEXCORE-294

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/424.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #424


commit 82679e0f65a97d7686811539e9506b529e26ee42
Author: Tushar R. Gosavi <tus...@apache.org>
Date:   2016-11-25T05:31:23Z

APEXCORE-294 controlled shutdown of an application.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #422: APEXCORE-575 Improve application restart time.

2016-11-24 Thread tushargosavi
GitHub user tushargosavi opened a pull request:

https://github.com/apache/apex-core/pull/422

APEXCORE-575 Improve application restart time.

I have implemented a new storage agent (CascadeStorageAgent) which 
maintains two storage agents one for old checkpoint directory and one for new 
checkpoint directory, using this storage agent we could direct read on old 
directory during initial start, and write to new checkpoint directory. With 
this we could avoid copy of checkpoints from old directory to new directory. 
with 2 GB state application restart was brought down to few seconds from 2 
minutes.

Other changes are
- Add log message to print time taken to copy the initial state.
- Do not copy stats and events directory as they are overwritten anyway in 
the new application.
- Use new storage agent to avoid copying checkpoints directory.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tushargosavi/apex-core restart_optimizations

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/422.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #422


commit edc02612efbb786243e6a0188d40fe1e08ac941b
Author: Tushar R. Gosavi <tus...@apache.org>
Date:   2016-11-18T18:06:15Z

APEXCORE-575 Improve application restart time.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #410: APEXCORE-408: Ability to schedule Sub-DAG from ...

2016-10-14 Thread tushargosavi
GitHub user tushargosavi opened a pull request:

https://github.com/apache/apex-core/pull/410

APEXCORE-408: Ability to schedule Sub-DAG from running application.

Pull request for dynamic dag modification through stats listener.  It 
provides following
functionality

- StatsListener can access the opearator name for easily detecting which 
opearator stats are being processed.
- StatsListener can create a instance of object through which it can submit 
dag modifications to the engine.
- StatsListener can return dag changes as a response to engine.
- PlanModifier is modified to take a DAG and apply it on the existing 
running DAG and deploy the changes.

The following functionality is not working yet.

- The new opearator does not start from the correct windowId 
(https://issues.apache.org/jira/browse/APEXCORE-532)
- Relanched application failed to start when it was killed after dynamic 
dag modification.
- There is no support for resuming operator from previous state when they 
were removed. This could be achived through
  readig state through external storage on setup.
- persist operator support is not present for newly added streams.
- Not all parts are covered through tests.

The demo application using the feature is available at
https://github.com/tushargosavi/apex-dynamic-scheduling

There are two variations of WordCount application. The first variation 
detects the presence of
new files start a disconnected DAG to process the data.

(https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/WordCountApp.java)

The second application 
(https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/ExtendApp.java),
initially only one reader operator is running in the DAG, and provides 
pendingFiles as auto-metric to stat listener.
On detecting pending files it attaches splitter counter and output operator 
to the read operator. Once files are processed the splitter, counter and
output operators are removed and added back again if new data files are 
added into the directory.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tushargosavi/incubator-apex-core APEXCORE-408

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/410.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #410


commit 5cecd029463de021469afb20650e3afbf75fd088
Author: Tushar R. Gosavi <tus...@apache.org>
Date:   2016-08-08T10:24:41Z

APEXCORE-408: Ability to schedule Sub-DAG from running application.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #393: APEXCORE-408 Ability to schedule Sub-DAG from r...

2016-10-14 Thread tushargosavi
Github user tushargosavi closed the pull request at:

https://github.com/apache/apex-core/pull/393


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #361: APEXCORE-496 make operator name available to St...

2016-10-08 Thread tushargosavi
Github user tushargosavi closed the pull request at:

https://github.com/apache/apex-core/pull/361


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #402: APEXCORE-532: Fix issue where new operators add...

2016-10-03 Thread tushargosavi
GitHub user tushargosavi opened a pull request:

https://github.com/apache/apex-core/pull/402

APEXCORE-532: Fix issue where new operators added to dag starts from 
initial checkpoint

In case new operators are added dynamically, they should start from 
windowId of upstream operator. The issue was that new operators gets added 
using addLogicalOpeartor which sets recoveryWindowId to INITIAL_CHECKPOINT, 
which prevents getActivationCheckpoint to return correct window id of the 
operator during deploy.

@PramodSSImmaneni  please review.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tushargosavi/incubator-apex-core APEXCORE-532

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/402.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #402


commit 30091217dced6d1581a8630374c942a811ab6ed2
Author: Tushar R. Gosavi <tus...@apache.org>
Date:   2016-10-03T07:07:59Z

APEXCORE-532: Fix issue where new operators added to dag starts from 
initial checkpoint




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #393: **Review Only** Apexcore 408

2016-09-16 Thread tushargosavi
GitHub user tushargosavi opened a pull request:

https://github.com/apache/apex-core/pull/393

**Review Only** Apexcore 408

Pull request for dynamic dag modification through stats listener.  It 
provides following
functionality

- StatsListener can access the opearator name for easily detecting which 
opearator stats are being processed.
- StatsListener can create a instance of object through which it can submit 
dag modifications to the engine.
- StatsListener can return dag changes as a response to engine.
- PlanModifier is modified to take a DAG and apply it on the existing 
running DAG and deploy the changes.

The following functionality is not working yet.

- The new opearator does not start from the correct windowId 
(https://issues.apache.org/jira/browse/APEXCORE-532)
- Relanched application failed to start when it was killed after dynamic 
dag modification.
- There is no support for resuming operator from previous state when they 
were removed. This could be achived through
  readig state through external storage on setup.
- persist operator support is not present for newly added streams.
- Not all parts are covered through tests.

The demo application using the feature is available at
https://github.com/tushargosavi/apex-dynamic-scheduling

There are two variations of WordCount application. The first variation 
detects the presence of
new files start a disconnected DAG to process the data.

(https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/WordCountApp.java)

The second application 
(https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/ExtendApp.java),
initially only one reader operator is running in the DAG, and provides 
pendingFiles as auto-metric to stat listener.
On detecting pending files it attaches splitter counter and output operator 
to the read operator. Once files are processed the splitter, counter and
output operators are removed and added back again if new data files are 
added into the directory.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tushargosavi/incubator-apex-core APEXCORE-408

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/393.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #393


commit 05fcca2c919e2828f8348e5e4ed60d87e044b5ac
Author: Tushar R. Gosavi <tus...@apache.org>
Date:   2016-08-08T10:24:41Z

APEXCORE-496: Provide way to access dag elements for statslisteners.

commit 8cf49f28a7732787c2517a5b345e4690be517280
Author: Tushar R. Gosavi <tus...@apache.org>
Date:   2016-07-05T08:40:26Z

APEXCORE-408: Dynamic dag changes in Apex through StatsListeners




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #387: APEXCORE-525 fail validation if statefull strea...

2016-09-12 Thread tushargosavi
GitHub user tushargosavi opened a pull request:

https://github.com/apache/apex-core/pull/387

APEXCORE-525 fail validation if statefull streamcodec does not implement 
newInstance method.

If user implements a streamcodec extending DefaultStatefulStreamCodec they 
need to also remember to implement newInstance method as platform use object 
returned by newInstance as codec. If use forget to override newInstance then  
platform keep on using default codec without any warning to user and
such error are caught very late mostly resulting incorrect data 
distribution and incorrect result at final
operator.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tushargosavi/incubator-apex-core APEXCORE-525

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/387.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #387


commit 8ab11fae86d608d5fb4a3e06153cee056b4cb5d5
Author: Tushar R. Gosavi <tus...@apache.org>
Date:   2016-09-12T06:43:06Z

APEXCORE-525: fail validation if statefull streamcodec does not
implement newInstance method.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #361: APEXCORE-496 make operator name available to St...

2016-08-10 Thread tushargosavi
Github user tushargosavi commented on a diff in the pull request:

https://github.com/apache/apex-core/pull/361#discussion_r74229007
  
--- Diff: api/src/main/java/com/datatorrent/api/StatsListener.java ---
@@ -115,6 +115,28 @@
 List getOperatorResponse();
   }
 
+  /**
+   * An interface to the DAG. Stats listener can get information about
+   * operator or other elements in the DAG through this interface. 
currerntly
+   * we only provide method to extract the operator name based on the 
physical
+   * id of the operator. In future more methods can be added.
+   *
+   */
--- End diff --

exposing all information may expose internal implementation of the DAG. we 
can only have dependency on apex-api. Can you suggest which information can be 
provided to stat listener without adding dependency on the apex-engine. I can 
think of following
- name
- number of partitions for self.
- aggregated stats at logical operator level.
This list could be expanded in future.

This jira puts the required infrastructure without breaking backward 
compatibility.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core issue #361: APEXCORE-496 make operator name available to StatsList...

2016-08-08 Thread tushargosavi
Github user tushargosavi commented on the issue:

https://github.com/apache/apex-core/pull/361
  
@vrozov  @sandeshh I have updated the pull request  please review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #351: APEXMALHAR-2169 Fixed the issue of dynamic pa...

2016-08-05 Thread tushargosavi
Github user tushargosavi commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/351#discussion_r73647526
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
 ---
@@ -188,6 +188,8 @@
   @Min(1)
   private int initialPartitionCount = 1;
 
+  private boolean isPartitionBasedOnLoad = false;
--- End diff --

Add a comment about this flag.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #361: APEXCORE-496 make operator name available to St...

2016-08-05 Thread tushargosavi
Github user tushargosavi commented on a diff in the pull request:

https://github.com/apache/apex-core/pull/361#discussion_r73645793
  
--- Diff: api/src/main/java/com/datatorrent/api/StatsListener.java ---
@@ -113,6 +113,8 @@
 long getLatencyMA();
 
 List getOperatorResponse();
+
+String getOperatorName();
--- End diff --

stat listener only have access to the BatchedOperatorStats, hence added it 
as part of it. I am closing this pull request now. Making changes as per 
discussion on the dev thread.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #282: APEXMALHAR-2066 JdbcPolling,idempotent,partit...

2016-07-13 Thread tushargosavi
Github user tushargosavi commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/282#discussion_r70569998
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -0,0 +1,656 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.validation.constraints.Min;
+
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.Operator.IdleTimeHandler;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.lib.db.AbstractStoreInputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Abstract operator for for consuming data using JDBC interface
+ * User needs User needs to provide
+ * tableName,dbConnection,setEmitColumnList,look-up key 
+ * Optionally batchSize,pollInterval,Look-up key and a where clause can be 
given
+ * 
+ * This operator uses static partitioning to arrive at range queries for 
exactly
+ * once reads
+ * Assumption is that there is an ordered column using which range queries 
can
+ * be formed
+ * If an emitColumnList is provided, please ensure that the keyColumn is 
the
+ * first column in the list
+ * Range queries are formed using the {@link JdbcMetaDataUtility}} Output -
+ * comma separated list of the emit columns eg columnA,columnB,columnC
+ * 
+ * In the next iterations this operator would support an in-clause for
+ * idempotency instead of having only range query support to support non 
ordered
+ * key columns
+ * 
+ * @displayName Jdbc Polling Input Operator
+ * @category Input
+ * @tags database, sql, jdbc, partitionable,exactlyOnce
+ */
+@Evolving
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator<T, JdbcStore>
+implements ActivationListener, IdleTimeHandler, 
Partitioner<AbstractJdbcPollInputOperator>
+{
+  /**
+   * poll interval in milliseconds
+   */
+  private static int pollInterval = 1;
+
+  @Min(1)
+  private int partitionCount = 1;
+  protected transient int operatorId;
+  protected transient boolean isReplayed;
+  protected transient boolean isPollable;
+  protected int batchSize;
+  protected static int fetchSize = 2;
+  /**
+   * Map of windowId to  of the range key
+   */
+  protected transient MutablePair<String, String> 
currentWindowRecoveryState;
+
+  /**
+   * size of the emit queue used to hold polled records before emit
+   */
+  private static int queueCapacity = 4 * 1024 * 1024;
+  private transient volatile boolean execute;
+  private transient AtomicReference cause;
+  protected transient int spinMillis;
+  private transient OperatorContext context;
+  protected String tableName;
+  protected String key;
+  protected long currentWindowId;
+  protected KeyValPair<String, String> r

[GitHub] apex-malhar pull request #330: Initial cut of Inner Join operator for REVIEW...

2016-07-08 Thread tushargosavi
Github user tushargosavi commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r70043528
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java
 ---
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.join.managed.ManagedSpillableComplexComponent;
+import com.datatorrent.lib.join.managed.ManagedTimeStateMultiMap;
+import com.datatorrent.netlet.util.Slice;
+
+public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends 
AbstractInnerJoinOperator<K,T> implements
+Operator.CheckpointNotificationListener, 
Operator.CheckpointListener,Operator.IdleTimeHandler
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class);
+  public static final String stateDir = "managedState";
+  public static final String stream1State = "stream1Data";
+  public static final String stream2State = "stream2Data";
+  private transient long sleepMillis;
+  private transient Map<JoinEvent<K,T>, Future> waitingEvents = 
Maps.newLinkedHashMap();
+  private int noOfBuckets = 1;
+  private Long bucketSpanTime;
+  protected ManagedTimeStateImpl stream1Store;
+  protected ManagedTimeStateImpl stream2Store;
+
+  @Override
+  public void createStores()
+  {
+stream1Store = new ManagedTimeStateImpl();
+stream2Store = new ManagedTimeStateImpl();
+stream1Store.setNumBuckets(noOfBuckets);
+stream2Store.setNumBuckets(noOfBuckets);
+if (bucketSpanTime != null) {
+  
stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
+  
stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
+}
+
+if (getExpiryTime() != null) {
+  
stream1Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime()));
+  
stream2Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime()));
+}
+
+component = new ManagedSpillableComplexComponent();
+stream1Data = 
((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream1Store,
 isStream1KeyPrimary());
+stream2Data = 
((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream2Store,
 isStream2KeyPrimary());
+  }
+
+  @Override
+  protected void processTuple(T tuple, boolean isStream1Data)
+  {
+Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? 
stream1Data : stream2Data;
+K key = extractKey(tuple,isStream1Data);
+long timeBucket = extractTime(tuple,isStream1Data);
+((ManagedTimeStateMultiMap)store).put(key, tuple,timeBucket);
+joinStream(key,tuple,isStream1Data);
+  }
+
+  @Override
+  protected void joinStream(K key, T tuple, boolean isStream1Data)
+  {
+Spillable.SpillableByteArrayListMultimap<K, T> store = isStream1Data ? 
stream2Data : stream1Data;
+Future future = ((ManagedTimeStateMultiMap)store).getAsync(ke

[GitHub] apex-malhar pull request #330: Initial cut of Inner Join operator for REVIEW...

2016-07-08 Thread tushargosavi
Github user tushargosavi commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r70043086
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java 
---
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.BaseOperator;
+
+public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator
+{
+  protected transient String[][] includeFields;
+  protected transient List keyFields;
+  protected transient List timeFields;
+
+  private Long expiryTime;
+  protected SpillableComplexComponent component;
+  protected Spillable.SpillableByteArrayListMultimap<K,T> stream1Data;
+  protected Spillable.SpillableByteArrayListMultimap<K,T> stream2Data;
+  private boolean isStream1KeyPrimary = false;
+  private boolean isStream2KeyPrimary = false;
+
+  @NotNull
+  private String keyFieldsStr;
+  @NotNull
+  private String includeFieldStr;
+  private String timeFieldsStr;
+
+  protected void processTuple(T tuple, boolean isStream1Data)
+  {
+Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? 
stream1Data : stream2Data;
+K key = extractKey(tuple,isStream1Data);
+store.put(key, tuple);
+joinStream(key,tuple,isStream1Data);
+  }
+
+  protected void joinStream(K key, T tuple, boolean isStream1Data)
+  {
+Spillable.SpillableByteArrayListMultimap<K, T> store = isStream1Data ? 
stream2Data : stream1Data;
+List value = store.get(key);
+
+// Join the input tuple with the joined tuples
+if (value != null) {
+  for (T joinedValue : value) {
+T result = isStream1Data ? joinTuples(Arrays.asList(tuple, 
joinedValue)) :
--- End diff --

why list as argument to joinTuples?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: Initial cut of Inner Join operator for REVIEW...

2016-07-08 Thread tushargosavi
Github user tushargosavi commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r70042536
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java 
---
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.BaseOperator;
+
+public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator
+{
+  protected transient String[][] includeFields;
+  protected transient List keyFields;
+  protected transient List timeFields;
+
+  private Long expiryTime;
+  protected SpillableComplexComponent component;
+  protected Spillable.SpillableByteArrayListMultimap<K,T> stream1Data;
+  protected Spillable.SpillableByteArrayListMultimap<K,T> stream2Data;
+  private boolean isStream1KeyPrimary = false;
+  private boolean isStream2KeyPrimary = false;
+
+  @NotNull
+  private String keyFieldsStr;
+  @NotNull
+  private String includeFieldStr;
+  private String timeFieldsStr;
+
+  protected void processTuple(T tuple, boolean isStream1Data)
--- End diff --

Instead of boolean , can you pass the store directly?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: Initial cut of Inner Join operator for REVIEW...

2016-07-08 Thread tushargosavi
Github user tushargosavi commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r70042432
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java
 ---
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.join.managed.ManagedSpillableComplexComponent;
+import com.datatorrent.lib.join.managed.ManagedTimeStateMultiMap;
+import com.datatorrent.netlet.util.Slice;
+
+public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends 
AbstractInnerJoinOperator<K,T> implements
+Operator.CheckpointNotificationListener, 
Operator.CheckpointListener,Operator.IdleTimeHandler
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class);
+  public static final String stateDir = "managedState";
+  public static final String stream1State = "stream1Data";
+  public static final String stream2State = "stream2Data";
+  private transient long sleepMillis;
+  private transient Map<JoinEvent<K,T>, Future> waitingEvents = 
Maps.newLinkedHashMap();
+  private int noOfBuckets = 1;
+  private Long bucketSpanTime;
+  protected ManagedTimeStateImpl stream1Store;
+  protected ManagedTimeStateImpl stream2Store;
+
+  @Override
+  public void createStores()
+  {
+stream1Store = new ManagedTimeStateImpl();
+stream2Store = new ManagedTimeStateImpl();
+stream1Store.setNumBuckets(noOfBuckets);
+stream2Store.setNumBuckets(noOfBuckets);
+if (bucketSpanTime != null) {
+  
stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
+  
stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
+}
+
+if (getExpiryTime() != null) {
--- End diff --

can it be different for each stores?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #351: APEXCORE-405 Common API to launch on local mode...

2016-07-01 Thread tushargosavi
Github user tushargosavi commented on a diff in the pull request:

https://github.com/apache/apex-core/pull/351#discussion_r69280615
  
--- Diff: engine/src/main/java/com/datatorrent/stram/LocalModeImpl.java ---
@@ -54,6 +55,53 @@ public DAG cloneDAG() throws Exception
   }
 
   @Override
+  public LocalAppHandle launchApp(StreamingApplication application, 
Configuration configuration, Attribute.AttributeMap
+  launchAttributes) throws LaunchException
+  {
+try {
+  //application.populateDAG(getDAG(), configuration == null ? new 
Configuration(false) : configuration);
--- End diff --

remove commented code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #318: APEXMALHAR-2119 add setters for partition cou...

2016-06-21 Thread tushargosavi
Github user tushargosavi commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/318#discussion_r67813977
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java 
---
@@ -1110,6 +1116,16 @@ public String toString()
   return "DirectoryScanner [filePatternRegexp=" + filePatternRegexp + 
" partitionIndex=" +
   partitionIndex + " partitionCount=" + partitionCount + "]";
 }
+
+protected void setPartitionIndex(int partitionIndex)
+{
+  this.partitionIndex = partitionIndex;
+}
+
+protected void setPartitionCount(int partitionCount)
+{
+  this.partitionCount = partitionCount;
+}
--- End diff --

Added test for setPartition and verified that the base function usage the 
value set.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #318: APEXMALHAR-2119 add setters for partition cou...

2016-06-20 Thread tushargosavi
Github user tushargosavi commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/318#discussion_r67808791
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java 
---
@@ -1110,6 +1116,16 @@ public String toString()
   return "DirectoryScanner [filePatternRegexp=" + filePatternRegexp + 
" partitionIndex=" +
   partitionIndex + " partitionCount=" + partitionCount + "]";
 }
+
+protected void setPartitionIndex(int partitionIndex)
+{
+  this.partitionIndex = partitionIndex;
+}
+
+protected void setPartitionCount(int partitionCount)
+{
+  this.partitionCount = partitionCount;
+}
--- End diff --

With the kryo.clone change in createPartition, it is getting set on the 
derived object from there. Initially createPartition was creating object of 
type DirectoryScanner, so derived class had no option to override it and create 
object of own type, and then set the count and index value, but it could not  
as those were private without setters in base class.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #319: REVIEW ONLY: Operator supporting the Beam con...

2016-06-20 Thread tushargosavi
Github user tushargosavi commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/319#discussion_r67679805
  
--- Diff: 
stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java ---
@@ -53,4 +60,121 @@
  */
 public interface WindowedStream extends ApexStream
 {
+
+  /**
+   * Count of all tuples
+   * @return new stream of Integer
+   */
+  > STREAM count();
+
+  /**
+   * Count tuples by the key
+   * If the input is KeyedTuple it will get the key from getKey method 
from the tuple
+   * If not, use the tuple itself as a key
+   * @return new stream of Map
+   */
+  >> STREAM 
countByKey();
+
+  /**
+   *
+   * Count tuples by the indexed key
+   * @param key the index of the field in the tuple that are used as key
+   * @return new stream of Map
+   */
+  >> STREAM countByKey(int 
key);
+
+
+  /**
+   *
+   * Return top tuples by the selected key
+   * @return new stream of Key and top N tuple of the key
+   */
+  <TUPLE, KEY, STREAM extends ApexStream<Map.Entry<KEY, List>>> 
STREAM topByKey(int N);
+
+  /**
+   *
+   * Return top tuples of all tuples in the window
+   * @return new stream of Map
+   */
+  > STREAM top(int N);
+
+  <O, STREAM extends ApexStream> STREAM combineByKey();
+
+  <O, STREAM extends ApexStream> STREAM combine();
+
+  /**
+   * Reduce transformation
+   * Add an operator to the DAG which merge tuple t1, t2 to new tuple
+   * @param name operator name
+   * @param reduce reduce function
+   * @return new stream of same type
+   */
+  > STREAM reduce(String name, 
Function.ReduceFunction reduce);
+
+  /**
+   * Fold transformation
+   * Add an operator to the DAG which merge tuple T to accumulated result 
tuple O
+   * @param initialValue initial result value
+   * @param fold fold function
+   * @param  Result type
+   * @return new stream of type O
+   */
+  <O, STREAM extends ApexStream> STREAM fold(O initialValue, 
Function.FoldFunction<T, O> fold);
+
+  /**
+   * Fold transformation
+   * Add an operator to the DAG which merge tuple T to accumulated result 
tuple O
+   * @param name name of the operator
+   * @param initialValue initial result value
+   * @param fold fold function
+   * @param  Result type
+   * @return new stream of type O
+   */
+  <O, STREAM extends ApexStream> STREAM fold(String name, O 
initialValue, Function.FoldFunction<T, O> fold);
+
+
+  /**
+   * Fold transformation
+   * Add an operator to the DAG which merge tuple T to accumulated result 
tuple O
+   * @param name name of the operator
+   * @param fold fold function
+   * @param  Result type
+   * @return new stream of type O
+   */
+  <O, K, STREAM extends ApexStream<KeyValPair<K, O>>> STREAM 
foldByKey(String name, Function.FoldFunction<T, KeyValPair<K, O>> fold);
+
+  /**
+   * Fold transformation
+   * Add an operator to the DAG which merge tuple T to accumulated result 
tuple O
+   * @param fold fold function
+   * @param  Result type
+   * @return new stream of type O
+   */
+  <O, K, STREAM extends ApexStream<KeyValPair<K, O>>> STREAM 
foldByKey(Function.FoldFunction<T, KeyValPair<K, O>> fold);
+
+
+  /**
+   * Reduce transformation
+   * Add an operator to the DAG which merge tuple t1, t2 to new tuple
+   * @param reduce reduce function
+   * @return new stream of same type
+   */
+  > STREAM reduce(Function.ReduceFunction 
reduce);
--- End diff --

reduceByKey ??


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #319: REVIEW ONLY: Operator supporting the Beam con...

2016-06-20 Thread tushargosavi
Github user tushargosavi commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/319#discussion_r67652351
  
--- Diff: 
stream/src/main/java/org/apache/apex/malhar/stream/window/WindowedStorage.java 
---
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.window;
+
+import com.datatorrent.api.StreamCodec;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface is for a key/value store for storing data for windowed 
streams.
+ * The key to this key/value store is a pair of (Window, K)
+ *
+ * Note that this interface does not take recovery into consideration.
+ *
+ */
+@InterfaceStability.Evolving
+public interface WindowedStorage<K, V>
+{
+  /**
+   * Sets the codec (serializer and deserializer) for W. This will be used 
when storing and retrieving the window to and
+   * from the storage. In most cases, the starting epoch millis should be 
used for TimedWindow and the starting tuple
+   * offset should be used for CountWindow
+   *
+   * @param codec
+   */
+  void setWindowCodec(StreamCodec codec);
+
+  /**
+   * Sets the codec (serializer and deserializer) for K. This will be used 
when storing and retrieving the key to and
+   * from the storage
+   *
+   * @param codec
+   */
+  void setKeyCodec(StreamCodec codec);
+
+  /**
+   * Sets the codec (serializer and deserializer) for V. This will be used 
when storing and retrieving the value to and
+   * from the storage
+   *
+   * @param codec
+   */
+  void setValueCodec(StreamCodec codec);
+
+  /**
+   * Sets the data associated with the given window and the key
+   *
+   * @param window
+   * @param key
+   * @param value
+   */
+  void put(Window window, K key, V value);
+
+  /**
+   * Gets the key/value pairs associated with the given window
+   *
+   * @param window
+   * @return
+   */
+  Set<Map.Entry<K, V>> entrySet(Window window);
--- End diff --

Can it be Iterator<Map.Entry<K,V>>  In case state is huge and stored in fs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #318: APEXMALHAR-2119 add setters for partition cou...

2016-06-15 Thread tushargosavi
GitHub user tushargosavi opened a pull request:

https://github.com/apache/apex-malhar/pull/318

APEXMALHAR-2119 add setters for partition count and index.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tushargosavi/incubator-apex-malhar 
APEXMALHAR-2119

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/318.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #318


commit 82cfbbbcca80c4f6eb4d76cb35a6fb5402db0d58
Author: Tushar R. Gosavi <tus...@apache.org>
Date:   2016-06-15T11:42:16Z

APEXMALHAR-2119 add setters for partition count and index.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-apex-malhar pull request: APEXMALHAR-998 extend second type parame...

2016-06-01 Thread tushargosavi
GitHub user tushargosavi opened a pull request:

https://github.com/apache/incubator-apex-malhar/pull/305

APEXMALHAR-998 extend second type parameter from Object



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tushargosavi/incubator-apex-malhar 
APEXMALHAR-998

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-apex-malhar/pull/305.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #305


commit e809bee20b0761e32bf4b1213e0dafa2791178de
Author: Tushar R. Gosavi <tus...@apache.org>
Date:   2016-06-01T09:32:27Z

APEXMALHAR-998 extend second type parameter from Object




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-apex-core pull request: *Review only* Prototype of runni...

2016-05-24 Thread tushargosavi
Github user tushargosavi commented on a diff in the pull request:

https://github.com/apache/incubator-apex-core/pull/340#discussion_r64374984
  
--- Diff: engine/src/main/java/com/datatorrent/stram/StramUtils.java ---
@@ -133,4 +135,37 @@ public static JSONObject getStackTrace()
 return jsonObject;
   }
 
+  public static JSONObject getStackTrace2() throws JSONException
+  {
+
+String result = "hello";
+try {
+
+  String processName = 
java.lang.management.ManagementFactory.getRuntimeMXBean().getName();
+  Long processId = Long.parseLong(processName.split("@")[0]);
+
+  String cmd = "jstack " + processId;
--- End diff --

jstack needs to in the path, else above command might fail. also need to 
make sure that these tool works with different implementation of jvm.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---