[GitHub] flink issue #3760: FLINK-5752 Support push down projections for HBaseTableSo...

2017-06-20 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3760
  
@fhueske 
Do you have some time to check this now considering Flink - 1.3 release is 
out?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink

2017-05-30 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2332
  
So I think at a first level let us have put/delete mutations alone for 
Streaming ? Since am not aware of how flink users are currently interacting 
with HBase not sure on what HBase ops should be supported. From your case it 
seems puts/deletes are the common cases. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink

2017-05-30 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2332
  
@nragon 
I agree. But your use case does it have increments/appends?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink

2017-05-29 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2332
  
I can take this up and come up with a design doc. Reading thro the comments 
here and the final decision points I think only puts/deletes can be considered 
idempotent. But increments/decrements cannot be considered to be idempotent. We 
may need to two types of sink then one sink which supports the puts/deletes and 
the other one where we need to support non-idempotent ops.
Coming to @nielsbasjes issue of not able to flush the buffered mutator - 
should you always call bufferedmutator#flush() on every checkpoint call?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink

2017-05-16 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2332
  
@delding 
Are you still working on this?
@nragon 
Let me know how can be of help here? I can work on this PR too since I have 
some context on the existing PR though it is stale.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3881: FLINK-6284 Incorrect sorting of completed checkpoi...

2017-05-15 Thread ramkrish86
Github user ramkrish86 closed the pull request at:

https://github.com/apache/flink/pull/3881


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3881: FLINK-6284 Incorrect sorting of completed checkpoints in ...

2017-05-12 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3881
  
I won't be available for next 2 to 3 hours. So feel free to decide based on 
 your convenience in case you need to make the RC candidate for 1.3 release. I 
am sorry that I could not make an initial commit that took care of things 
properly, should have been more careful.  Thanks for the opportunity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3881: FLINK-6284 Incorrect sorting of completed checkpoints in ...

2017-05-12 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3881
  
@tillrohrmann 
Thanks for the new PR. I just executed your change with 101, 99 , 100 as 
the checkpoint order. In this case 100 should be the latest one though the 
actual ids are not sorted. But with your change and my earlier commit it will 
always sort 99, 100, 101.
Can you take a look at my latest commit, that is based on czxid (as per 
your suggestion) and I think that makes sense. What ever be the actual id, in 
the zookeeper what was created recently will be the latest checkpoint. But am 
not very sure if the checkpointId will really be added in a  non-sorted way and 
can 100 be the latest one (though 101 was also there). 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3881: FLINK-6284 Incorrect sorting of completed checkpoi...

2017-05-12 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3881#discussion_r116211254
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 ---
@@ -346,17 +346,20 @@ public int exists(String pathInZooKeeper) throws 
Exception {
} else {
// Initial cVersion (number of changes to the 
children of this node)
int initialCVersion = stat.getCversion();
-
-   List children = 
ZKPaths.getSortedChildren(
-   
client.getZookeeperClient().getZooKeeper(),
-   
ZKPaths.fixForNamespace(client.getNamespace(), "/"));
-
-   for (String path : children) {
-   path = "/" + path;
+   List childrenInStr =
+   
client.getZookeeperClient().getZooKeeper().
+   
getChildren(ZKPaths.fixForNamespace(client.getNamespace(), "/"), false);
+   List children = new 
ArrayList(childrenInStr.size());
+   for(String childNode : childrenInStr) {
+   children.add(new Long(childNode));
--- End diff --

Ok. I see. I am not sure on this MesosWorker. Using cxid am not sure if we 
have an API. If so we can direclty use it. Will be back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3881: FLINK-6284 Incorrect sorting of completed checkpoi...

2017-05-12 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3881#discussion_r116211132
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 ---
@@ -346,17 +346,20 @@ public int exists(String pathInZooKeeper) throws 
Exception {
} else {
// Initial cVersion (number of changes to the 
children of this node)
int initialCVersion = stat.getCversion();
-
-   List children = 
ZKPaths.getSortedChildren(
-   
client.getZookeeperClient().getZooKeeper(),
-   
ZKPaths.fixForNamespace(client.getNamespace(), "/"));
-
-   for (String path : children) {
-   path = "/" + path;
+   List childrenInStr =
+   
client.getZookeeperClient().getZooKeeper().
+   
getChildren(ZKPaths.fixForNamespace(client.getNamespace(), "/"), false);
+   List children = new 
ArrayList(childrenInStr.size());
+   for(String childNode : childrenInStr) {
+   children.add(new Long(childNode));
+   }
--- End diff --

Here again. It is my bad. I lost my previous changes becauseo f the compile 
issue. So lost this. I have made a new push already for this sort thing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3881: FLINK-6284 Incorrect sorting of completed checkpoi...

2017-05-12 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3881#discussion_r116211032
  
--- Diff: pom.xml ---
@@ -101,7 +101,8 @@ under the License.
0.7.4
5.0.4
3.4.6
-   2.12.0
+   2.11.0
--- End diff --

Oh..My environment was not able to get 2.12.0. So to make things compile I 
included this change. Will revert it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3881: FLINK-6284 Incorrect sorting of completed checkpoi...

2017-05-12 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3881#discussion_r116188234
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 ---
@@ -346,11 +346,7 @@ public int exists(String pathInZooKeeper) throws 
Exception {
} else {
// Initial cVersion (number of changes to the 
children of this node)
int initialCVersion = stat.getCversion();
-
-   List children = 
ZKPaths.getSortedChildren(
-   
client.getZookeeperClient().getZooKeeper(),
-   
ZKPaths.fixForNamespace(client.getNamespace(), "/"));
-
+   List children = 
client.getZookeeperClient().getZooKeeper().getChildren(ZKPaths.fixForNamespace(client.getNamespace(),
 "/"), false);
--- End diff --

Let me do it my older way. I had a patch but I thought this is better. I 
checked the javadoc of the ZKPaths only. I will push my initial version of the 
patch only then, where convert the List to List and then use that 
as the sorted one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3881: FLINK-6284 Incorrect sorting of completed checkpoi...

2017-05-12 Thread ramkrish86
GitHub user ramkrish86 opened a pull request:

https://github.com/apache/flink/pull/3881

FLINK-6284 Incorrect sorting of completed checkpoints in 
ZooKeeperCompletedCheckpointStore

ZooKeeperCompletedCheckpointStore

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


Making use of the Zookeeper's getChildren() API directly so that it just 
creates a list in the sequence order. If we go with the ZKPaths API then we 
need to do some sorting by converting the List to List.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ramkrish86/flink FLINK-6284

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3881.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3881


commit 33bf37a2d706af6c8eb6cbe9d58aa3ac9d1f03e0
Author: Ramkrishna <ramkrishna.s.vasude...@intel.com>
Date:   2017-05-12T08:18:16Z

FLINK-6284 Incorrect sorting of completed checkpoints in
ZooKeeperCompletedCheckpointStore




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3760: FLINK-5752 Support push down projections for HBaseTableSo...

2017-04-27 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3760
  
@fhueske 
Thanks for the update. Got it.
But I would like to say that if there any issues/JIRA that I could be of 
help for the 1.3 release fork, I would happy to help. Pls point me to those you 
think I can be of help, I can have a look and commit to what ever I can spend 
time on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3760: FLINK-5752 Support push down projections for HBaseTableSo...

2017-04-26 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3760
  
I checked the recent failure.
`Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 20.417 sec 
<<< FAILURE! - in 
org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest`

This failure seems unrelated with the changes in this patch.
@tonycox , @fhueske 
Just a gentle ping.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3760: FLINK-5752 Support push down projections for HBaseTableSo...

2017-04-25 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3760
  
@tonycox 
Thanks for the comment? Just curious. Any reason for the repush - is it for 
checking the test failures again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3760: FLINK-5752 Support push down projections for HBaseTableSo...

2017-04-24 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3760
  
The failures seems not directly related.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3760: FLINK-5752 Support push down projections for HBaseTableSo...

2017-04-24 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3760
  
@fhueske , @tonycox 
Can you have a look at this PR? Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3760: FLINK-5752 Support push down projections for HBase...

2017-04-24 Thread ramkrish86
GitHub user ramkrish86 opened a pull request:

https://github.com/apache/flink/pull/3760

FLINK-5752 Support push down projections for HBaseTableSource (Ram)

Ran mvn clean verify -DskipTests
In this patch 
`Arrays.sort(nestedFields[i]);`

Am doing this before doing addColumns for the new projected table source, 
because here the cols appears in a reverse sorted way and when we apply that 
for the new projected table source it creates an assertion error while creating 
the new calcite program with the projected cols
`assert expr.getType().getFieldList().get(field.getIndex()) == field;`
So doing this sort helps in correcting those issues and the tests run fine. 
But this nestedFields that is being passed to the projectNestedFields() API is 
created by
`def getProjectedFields: Array[Array[String]] ` under RexProgramExtractor. 
Trying to understand what it does. 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ramkrish86/flink FLINK-5752

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3760.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3760


commit 1885b0c9afc7aa49747987b0508ba82002a1bc8d
Author: Ramkrishna <ramkrishna.s.vasude...@intel.com>
Date:   2017-04-24T10:23:08Z

FLINK-5752 Support push down projections for HBaseTableSource (Ram)




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGr...

2017-03-09 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3334
  
@StephanEwen 
No problem. I appreciate your time and efforts. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGr...

2017-03-08 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3334
  
@StephanEwen 
I saw in another JIRA one of your comment where you talked about 
refactoring CheckPointcoordinator and Pendingcheckpoint. So you woud this PR to 
wait till then?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3478: Flink 4816 Executions failed from "DEPLOYING" shou...

2017-03-06 Thread ramkrish86
GitHub user ramkrish86 opened a pull request:

https://github.com/apache/flink/pull/3478

Flink 4816 Executions failed from "DEPLOYING" should retain restored 
checkpoint information

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

Just does what ever the description says. Feedback/suggestions are welcome. 
Ran 'mvn clean verify -DskipTests' no static comment failures.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ramkrish86/flink FLINK-4816

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3478.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3478


commit 8cc66b918628fc1c5febb9408829c73e2ad59c46
Author: Ramkrishna <ramkrishna.s.vasude...@intel.com>
Date:   2017-03-06T11:25:37Z

FLINK-4816 Executions failed from "DEPLOYING" should retain restored
checkpoint information

commit 5a1bb31668dc81224c2aea870213a7cba1be3352
Author: Ramkrishna <ramkrishna.s.vasude...@intel.com>
Date:   2017-03-06T11:41:58Z

Add lock to getRestoredCheckpointID




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGr...

2017-03-06 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3334
  
Just updated and did a force push to avoid the merge commit. Now things are 
fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGr...

2017-03-03 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3334
  
Ping for reviews here!!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGr...

2017-03-03 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3334
  
@StephanEwen , @wenlong88 , @shixiaogang 
Pls have a look at the latest push. Now I am tracking the failures in the 
checkpointing and incrementing  a new counter based on it. Added test cases 
also. 
I have not changed the constructors of the affected class because it 
touches many files. I can update it based on the feedback of the latest PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGr...

2017-03-02 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3334
  
I thinkI got a better way to trck this. Will update the PR sooner.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGr...

2017-03-01 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3334
  
Thanks for the input. I read the code. There are two ways a checkpoint 
fails (as per my code understanding). If for some reason checkpointing cannot 
be performed we send DeclineCheckpoint message. That is handled by the 
Checkpointcoordinator.
Another is if there is an external error in checkpointing and in that case 
we call failExternally. Which transitions the state to FAILED and closes all 
the watchdog, and cancels the invokable also. Now is the intent to track how 
many times this happens and if so track such occurences of failure and then 
fail the execution graph?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGr...

2017-03-01 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3334
  
I think I got what you are saying here. Since Execution#triggerCheckpoint 
is the actual checkpoint call and currently we don't track it if there is a 
failure. So your point is it is better know if there was a failure in actual 
checkpoint triggering at the Task level and then count that as a failure. Am I 
right @wenlong88 ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGr...

2017-03-01 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3334
  
@wenlong88 
Can you tell more when you say checkpointing failure and trigger failure? I 
think if you are saying about tracking the number of times the execution fails 
after restoring from a checkpoint I think FLINK-4815 is trying to focus that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3334: FLINK-4810 Checkpoint Coordinator should fail Exec...

2017-03-01 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3334#discussion_r103638771
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -537,12 +562,27 @@ else if (!props.forceCheckpoint()) {
if (!checkpoint.isDiscarded()) {
checkpoint.abortError(new 
Exception("Failed to trigger checkpoint"));
}
+   if(numUnsuccessful > 
maxUnsuccessfulCheckpoints) {
+   return failExecution(executions);
+   }
return new 
CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
}
 
} // end trigger lock
}
 
+   private CheckpointTriggerResult failExecution(Execution[] executions) {
+   if (currentPeriodicTrigger != null) {
+   currentPeriodicTrigger.cancel();
+   currentPeriodicTrigger = null;
+   }
+   for (Execution execution : executions) {
+   // fail the graph
+   execution.fail(new Throwable("The number of max 
unsuccessful checkpoints attempts exhausted"));
--- End diff --

I verified the code once again. There is no reference to ExecutionGraph in 
Checkpointcoordinator and also calling fail on the current Execution actually 
triggers the restart flow to happen.
Execution#fail()->Marks state to 
FAILED->vertex#executionFailed()->graph#jobVertexInFinalState(). So you think 
this way of failing won't work?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3334: FLINK-4810 Checkpoint Coordinator should fail Exec...

2017-02-28 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3334#discussion_r103612421
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -537,12 +562,27 @@ else if (!props.forceCheckpoint()) {
if (!checkpoint.isDiscarded()) {
checkpoint.abortError(new 
Exception("Failed to trigger checkpoint"));
}
+   if(numUnsuccessful > 
maxUnsuccessfulCheckpoints) {
+   return failExecution(executions);
+   }
return new 
CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
}
 
} // end trigger lock
}
 
+   private CheckpointTriggerResult failExecution(Execution[] executions) {
+   if (currentPeriodicTrigger != null) {
+   currentPeriodicTrigger.cancel();
+   currentPeriodicTrigger = null;
+   }
+   for (Execution execution : executions) {
+   // fail the graph
+   execution.fail(new Throwable("The number of max 
unsuccessful checkpoints attempts exhausted"));
--- End diff --

Ok sure. I will add tests for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3334: FLINK-4810 Checkpoint Coordinator should fail Exec...

2017-02-28 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3334#discussion_r103612320
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -121,6 +121,8 @@
 
/** The maximum number of checkpoints that may be in progress at the 
same time */
private final int maxConcurrentCheckpointAttempts;
+   /** The maximum number of unsuccessful checkpoints */
+   private final int maxUnsuccessfulCheckpoints;
--- End diff --

ok.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGr...

2017-02-27 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3334
  
@StephanEwen - Ping for initial reviews. Will work on it based on the 
feedback.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3334: FLINK-4810 Checkpoint Coordinator should fail Exec...

2017-02-16 Thread ramkrish86
GitHub user ramkrish86 opened a pull request:

https://github.com/apache/flink/pull/3334

FLINK-4810 Checkpoint Coordinator should fail ExecutionGraph after "n" 
unsuccessful checkpoints

unsuccessful checkpoints

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


Ran mvn clean verify. Did not add test cases to know the first level 
feedback. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ramkrish86/flink FLINK-4810

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3334.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3334


commit 6e0fb38272e6bb59528065461c6ec6fdd43689ad
Author: Ramkrishna <ramkrishna.s.vasude...@intel.com>
Date:   2017-02-16T11:29:37Z

FLINK-4810 Checkpoint Coordinator should fail ExecutionGraph after "n"
unsuccessful checkpoints




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-02-15 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
Thanks to all @fhueske , @tonycox  and @wuchong for helping in getting this 
in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-02-13 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
@fhueske - Are you fine with that pom change? If so we can get this in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-02-13 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
I think that is not the only reason. Some where either these tests are 
creating more static objects or bigger objects that live for the life time of 
the JVM. May be this test exposes it. Actually this change in maxPermsize calls 
for a discussion on how the 'mvn' settings should be and what is the minimum 
size of heap required. Also in other projects the default jdk version is now 
moved to 8 atleast for the trunk version. I think similar thing can be done 
here. Thanks @tonycox .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-02-13 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
@fhueske  - so are you ok with @tonycox  suggestion of setting MAxPermSize 
for hbase module?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-02-13 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
Even jhat was not able to view the file as it had a problem in parsing the 
hprof file. So my question is if MaxPermSize is 128M - why does it work with 
jdk 8? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-02-13 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
I tried multiple ways to take the dump file from this mvn test command run. 
I get a hprof file which on opening in heap dump analyser throws EOF exception 
or NPE exception. 
@tonycox - were you able to get any heap dump?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-02-12 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
I tried to compare TableInputFormat and the new one. But the interesting 
part is HBaseInputformat does not fail in jdk 8 which was my default in my test 
env.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-02-10 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
Thanks to @tonycox  for helping me in reproducing this error. Changing to 
JDK 7 creates this issue and it creates due to permGen space running out of 
memory. I don't have a soln for this. It runs with JDK 8 with no hassles. Any 
inputs here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-02-09 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
> Results :

Tests run: 2, Failures: 0, Errors: 0, Skipped: 0

This is what I get as test result.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-02-09 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
It is CentOS Linux release 7.0.1406 (Core). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-02-09 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
I tried even that. The test runs fine for me. No OOME I get.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-02-07 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
`02/02/2017 06:19:20DataSink (collect())(6/32) switched to 
SCHEDULED 
02/02/2017 06:19:20 DataSink (collect())(2/32) switched to SCHEDULED 
02/02/2017 06:19:20 DataSink (collect())(8/32) switched to SCHEDULED 
02/02/2017 06:19:20 DataSink (collect())(6/32) switched to DEPLOYING 
02/02/2017 06:19:20 DataSink (collect())(8/32) switched to DEPLOYING 
02/02/2017 06:19:20 DataSink (collect())(2/32) switched to DEPLOYING 
02/02/2017 06:19:25 DataSink (collect())(1/32) switched to SCHEDULED 
02/02/2017 06:19:25 DataSink (collect())(1/32) switched to DEPLOYING 
Exception: java.lang.OutOfMemoryError thrown from the 
UncaughtExceptionHandler in thread "JvmPauseMonitor"
Exception: java.lang.OutOfMemoryError thrown from the 
UncaughtExceptionHandler in thread "hconnection-0x2247c79b-shared--pool20-t1"
Exception: java.lang.OutOfMemoryError thrown from the 
UncaughtExceptionHandler in thread "JvmPauseMonitor"
Exception: java.lang.OutOfMemoryError thrown from the 
UncaughtExceptionHandler in thread "flink-akka.actor.default-dispatcher-3"
Exception: java.lang.OutOfMemoryError thrown from the 
UncaughtExceptionHandler in thread "IPC Server handler 1 on 34267"
Exception: java.lang.OutOfMemoryError thrown from the 
UncaughtExceptionHandler in thread "CHAIN DataSource (localhost:53456)"
Exception: java.lang.OutOfMemoryError thrown from the 
UncaughtExceptionHandler in thread "CHAIN DataSource (localhost:53456)"
Exception: java.lang.OutOfMemoryError thrown from the 
UncaughtExceptionHandler in thread "CHAIN DataSource (localhost:53456)"
Exception: java.lang.OutOfMemoryError thrown from the 
UncaughtExceptionHandler in thread "org.apache.hadoop.hdfs.PeerCache@28360f4a"`

The same tests run cleanly in my linux box. Is there any place I can 
download the logs to see what could be the issue?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-02-07 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
> Will need to figure out what's the reason for that before I can merge the 
PR.

I tried running those tests again in my linux box and all went through 
without any error.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-02-06 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
@fhueske - A gentle reminder !!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-02-01 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
@fhueske - Please have a look at the javadoc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-02-01 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
> For now I'd suggest to keep the scope of the PR as it is right now. A bit 
more Java documentation on HBaseTableSource to explain how it is used would be 
great.
We can implement the NestedFieldsProjectableTableSource and the changes to 
HBaseTableSource in a follow up issue.

+1 for this. I can add some more javadoc to it. BTW am trying to checkout 
these Projections and using the ProjectabletableSource. Will be back on it. 
Thanks to every one for all the comments and feedback.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-02-01 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
bq.ProjectableTableSource works in scan process. 
Ya got it. I was just trying to relate with this HBase thing and could find 
that we try to read all cols and then do a flatMap and then return the required 
cols alone. Just read that PR to understand better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-02-01 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
I think going through the PR for 
https://issues.apache.org/jira/browse/FLINK-3848 - I think we try to project 
only the required columns. Similarly we could do here also. So my and @tonycox 
's suggestion of having a new way of ProjectableTableSource could help here. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-02-01 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
`   Table result = tableEnv
.sql("SELECT test1.f1.q1, test1.f2.q2 FROM test1 where 
test1.f1.q1 < 103");`

I just tried this query and it works with or without 
ProjectableTableSource.  So just wanted to know when does the projection come 
into place. I thought without Projection things of this sort may not work. 




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-01-31 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
> Regarding the HBaseTableSchema, we could also use it only internally and 
not expose it to the user. The HBaseTableSource would have a method addColumn() 
and forward the calls to its internal HBaseSchema.

Have done this. I initially thought to do things in the construction time 
itself. Now added an addColumn() in hbaseTableSource. HBaseSchema becomes 
totally package private and no access to users.

Regarding the flatSchema, generally in hbase only family is required and 
the qualifiers are just dynamic. But here for the sake of accessibility we 
expect the user to specify the column names. (trying to give a relational 
look). It is in case of projections is where we have some issues. In my opinion 
if we have a better API for Projection may be we could handle it better? The 
current nested way as you said is better in the sense that all columns of a 
family are grouped together. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-01-31 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
Just a general question
`def projectFields(fields: Array[Int]): ProjectableTableSource[T]`

Is it mandatory to only have int[] here. Can we have String[] that allows 
to specify specific names? May be even more generic could be to have a 
ColumnarTableProjectableSource which allows to specify family to column mapping 
some way? Ultimately its the table source that is going to do the mapping and 
create the projected table source. So that should be fine?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-01-31 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
Updated the PR fixing the comments. The comments were simple but the adding 
AbstractTableInputFormat and moving the code back and forth makes this one a 
bigger change. But internally they are just refactorings. The test cases passes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-01-31 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
To understand better

>  We could make flat schema an optional mode or implement it as a separate 
TableSource as well.
and this one

> This could be solved if we use a flat schema and encode the nesting as 
columnFamily$column

Are you talking about using seperators for it? May be am not getting your 
concern here. Ya I agree that nested schema is better API but if we go with 
flat schema then maintaining the family to qualifier relation may not be easy. 
As you said a seperate TableSource where we define such things would be better.
Regarding HBaseTableSchema I think that is better so that we could modify 
that class for better serialization and deserialization by adding more logic 
for different types of classes. Even when we go with flat schema I think this 
type of class would help us to maintain the logic of family to qualifier 
mapping?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-31 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r98828506
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
+ */
+public class HBaseTableSourceInputFormat extends TableInputFormat 
implements ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
+   private String tableName;
+   private transient Connection conn;
+   private transient org.apache.hadoop.conf.Configuration conf;
+   private HBaseTableSchema schema;
+
+   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, HBaseTableSchema schema) {
+   this.tableName = tableName;
+   this.conf = conf;
+   this.schema = schema;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   LOG.info("Initializing HBaseConfiguration");
+   connectToTable();
+   if(table != null) {
+   scan = getScanner();
+   }
+   }
+
+   @Override
+   protected Scan getScanner() {
+   Scan scan = new Scan();
+   for(String family : schema.getFamilyNames()) {
--- End diff --

Actually in hbse every thing is lexographically sorted (with byte[]). But 
here since we represent using String the map that we use in HBaseTableSchema 
should be sorting this in the natural order. 
But the results that we retrieve will always be lexographically sorted and 
we try to retrieve the specific cell from the result using this

>  res.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier));




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-31 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r98828345
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
+ */
+public class HBaseTableSourceInputFormat extends TableInputFormat 
implements ResultTypeQueryable {
--- End diff --

Here the Row - you mean the Type of generic that is passed to this class? 
Because the HBase table also has the concept of row which we are now not using 
it. So I feel the current name is better. What you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-31 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r98828283
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Map;
+
+/**
+ * Creates a table source that helps to scan data from an hbase table
+ *
+ * Note : the colNames are specified along with a familyName and they are 
seperated by a ':'
--- End diff --

True. Will remove.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-01-30 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
@tonycox 
I have addressed all your latest comments including making HBaseTableSource 
a ProjectableTableSource. 
@wuchong , @fhueske 
Are you guys fine with the latest updates. If so we can try closing this PR 
and further discussions of adding StreamingtableSource and supporting WHERE 
clauses can be done in subsequent PRs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-30 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r98487401
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Time;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.sql.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map<String, List<Pair<String, TypeInformation>>> 
familyMap =
--- End diff --

> I think PushProjectIntoBatchTableSourceScanRule is not good enough for 
nested data types. but we can project at least family columns now.

So you mean we will project the column families. One thing to remember is 
that though we have two families - Eduction and Department. When we retrieve 
Department would come first and then Education. So what matters is how we 
retrieve the result. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-30 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r98486712
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Time;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.sql.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map<String, List<Pair<String, TypeInformation>>> 
familyMap =
+   new HashMap<>();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
+   private static Class[] CLASS_TYPES = {
+   Integer.class, Short.class, Float.class, Long.class, 
String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, 
BigDecimal.class, Date.class, Time.class, byte[].class
--- End diff --

Ok. I can do. I have not used much of these Immutblecollection. Infact in 
our project we removed the dependency on this. Hence went on with  a default 
way. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-30 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r98486522
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Time;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.sql.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map<String, List<Pair<String, TypeInformation>>> 
familyMap =
--- End diff --

I just felt Map is not needed here. Anyway we have the Key as String. If 
you are so particular then I can change. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-01-29 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
Am not sure how we will manage this. But one thing that can be done is that 
if invalid family name is added to the scan - HBase internally throws 
FamilyNotFoundException - so that we can track and report back. But am not sure 
how to track the schema. So every time he wants to scan two families - the user 
has to call HBaseTableSchema#addColumns() twice (with the required qualifiers). 
@tonycox 
Can you help me understand - when you say ' so we can add more maps that 
generate after familyMap has set'?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-01-27 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
For ProjectableTableSource I think I need some clarity. Because currently 
is based on int[] representing the fields. So am not sure how to map them in 
terms of qualifiers under a family.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-01-26 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
@fhueske ,
I have fixed the comments as per @wuchong . And he has said +1 after fixing 
them. Would like to see the PR and see if it is fine with you too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-01-26 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
> Exception: java.lang.OutOfMemoryError thrown from the 
UncaughtExceptionHandler in thread 
"RpcServer.reader=6,bindAddress=testing-docker-bb4f2e37-e79f-42a3-a9e9-4995e42c70ba,port=45919"
Exception: java.lang.OutOfMemoryError thrown from the 
UncaughtExceptionHandler in thread "CHAIN DataSource (at 
getDataSet(HBaseTableSource.java:63) 
(org.apache.flink.addons.hbase.HBaseTableSourceInputFormat)) -> FlatMap 
(select: (f1.q1 AS q1, f1.q2 AS q2, f1.q3 AS q3)) (6/32)"
01/27/2017 05:57:12 DataSink (collect())(13/32) switched to DEPLOYING 
Exception: java.lang.OutOfMemoryError thrown from the 
UncaughtExceptionHandler in thread "LeaseRenewer:travis@localhost:39289"
01/27/2017 05:57:14 DataSink (collect())(12/32) switched to SCHEDULED 
Exception: java.lang.OutOfMemoryError thrown from the 
UncaughtExceptionHandler in thread "CHAIN DataSource (at 
getDataSet(HBaseTableSource.java:63) 
(org.apache.flink.addons.hbase.HBaseTableSourceInputFormat)) -> FlatMap 
(select: (f1.q1 AS q1, f1.q2 AS q2, f1.q3 AS q3)) (10/32)"

Getting this error in the travis build. But in my linux box it seems to 
pass.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-01-26 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
@wuchong 
I think I have updated the last comments from you. Thank you for all your 
help/support here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-26 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r98145293
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
 ---
@@ -0,0 +1,196 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSchema;
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter 
{
+
+   private static final byte[] ROW_1 = Bytes.toBytes("row1");
+   private static final byte[] ROW_2 = Bytes.toBytes("row2");
+   private static final byte[] ROW_3 = Bytes.toBytes("row3");
+   private static final byte[] F_1 = Bytes.toBytes("f1");
+   private static final byte[] F_2 = Bytes.toBytes("f2");
+   private static final byte[] Q_1 = Bytes.toBytes("q1");
+   private static final byte[] Q_2 = Bytes.toBytes("q2");
+   private static final byte[] Q_3 = Bytes.toBytes("q3");
+
+   @BeforeClass
+   public static void activateHBaseCluster(){
+   registerHBaseMiniClusterInClasspath();
+   }
+
+   @Test
+   public void testHBaseTableSourceWithSingleColumnFamily() throws 
Exception {
+   // create a table with single region
+   TableName tableName = TableName.valueOf("test");
+   // no split keys
+   byte[][] famNames = new byte[1][];
+   famNames[0] = F_1;
+   createTable(tableName, famNames, null);
+   // get the htable instance
+   HTable table = openTable(tableName);
+   List puts = new ArrayList();
+   // add some data
+   Put put = new Put(ROW_1);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(100));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_3, Bytes.toBytes(19991L));
+   puts.add(put);
+
+   put = new Put(ROW_2);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(101));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue1"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_3, Bytes.toBytes(19992L));
+   puts.add(put);
+
+   put = new Put(ROW_3);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(102));
+  

[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
Fixed all the minor comments given above. @tonycox , @wuchong , @fhueske .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97934537
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSchema;
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter 
{
+
+   public static final byte[] ROW_1 = Bytes.toBytes("row1");
+   public static final byte[] ROW_2 = Bytes.toBytes("row2");
+   public static final byte[] ROW_3 = Bytes.toBytes("row3");
+   public static final byte[] F_1 = Bytes.toBytes("f1");
+   public static final byte[] F_2 = Bytes.toBytes("f2");
+   public static final byte[] Q_1 = Bytes.toBytes("q1");
+   public static final byte[] Q_2 = Bytes.toBytes("q2");
+   public static final byte[] Q_3 = Bytes.toBytes("q3");
+
+   @BeforeClass
+   public static void activateHBaseCluster(){
+   registerHBaseMiniClusterInClasspath();
+   }
+
+   @Test
+   public void testHBaseTableSourceWithSingleColumnFamily() throws 
Exception {
+   // create a table with single region
+   MapFunction<Row, String> mapFunction = new MapFunction<Row, 
String>() {
+
+   @Override
+   public String map(Row value) throws Exception {
+   return value == null ? "null" : 
value.toString();
+   }
+   };
+   TableName tableName = TableName.valueOf("test");
+   // no split keys
+   byte[][] famNames = new byte[1][];
+   famNames[0] = F_1;
+   createTable(tableName, famNames, null);
+   // get the htable instance
+   HTable table = openTable(tableName);
+   List puts = new ArrayList();
+   // add some data
+   Put put = new Put(ROW_1);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(100));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_3, Bytes.toBytes(19991l));
+   puts.add(put);
+
+   put = new Put(ROW_2);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(101));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue1"));
+   // 3rd qual is long
+

[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97934510
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSchema;
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter 
{
+
+   public static final byte[] ROW_1 = Bytes.toBytes("row1");
+   public static final byte[] ROW_2 = Bytes.toBytes("row2");
+   public static final byte[] ROW_3 = Bytes.toBytes("row3");
+   public static final byte[] F_1 = Bytes.toBytes("f1");
+   public static final byte[] F_2 = Bytes.toBytes("f2");
+   public static final byte[] Q_1 = Bytes.toBytes("q1");
+   public static final byte[] Q_2 = Bytes.toBytes("q2");
+   public static final byte[] Q_3 = Bytes.toBytes("q3");
+
+   @BeforeClass
+   public static void activateHBaseCluster(){
+   registerHBaseMiniClusterInClasspath();
+   }
+
+   @Test
+   public void testHBaseTableSourceWithSingleColumnFamily() throws 
Exception {
+   // create a table with single region
+   MapFunction<Row, String> mapFunction = new MapFunction<Row, 
String>() {
+
+   @Override
+   public String map(Row value) throws Exception {
+   return value == null ? "null" : 
value.toString();
+   }
+   };
+   TableName tableName = TableName.valueOf("test");
+   // no split keys
+   byte[][] famNames = new byte[1][];
+   famNames[0] = F_1;
+   createTable(tableName, famNames, null);
+   // get the htable instance
+   HTable table = openTable(tableName);
+   List puts = new ArrayList();
+   // add some data
+   Put put = new Put(ROW_1);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(100));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
+   // 3rd qual is long
+   put.addColumn(F_1, Q_3, Bytes.toBytes(19991l));
+   puts.add(put);
+
+   put = new Put(ROW_2);
+   // add 3 qualifiers per row
+   //1st qual is integer
+   put.addColumn(F_1, Q_1, Bytes.toBytes(101));
+   //2nd qual is String
+   put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue1"));
+   // 3rd qual is long
+

[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97934502
  
--- Diff: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSchema;
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter 
{
+
+   public static final byte[] ROW_1 = Bytes.toBytes("row1");
+   public static final byte[] ROW_2 = Bytes.toBytes("row2");
+   public static final byte[] ROW_3 = Bytes.toBytes("row3");
+   public static final byte[] F_1 = Bytes.toBytes("f1");
+   public static final byte[] F_2 = Bytes.toBytes("f2");
+   public static final byte[] Q_1 = Bytes.toBytes("q1");
+   public static final byte[] Q_2 = Bytes.toBytes("q2");
+   public static final byte[] Q_3 = Bytes.toBytes("q3");
--- End diff --

ya . My bad. Will remove.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97934495
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Creates a table source that helps to scan data from an hbase table
+ *
+ * Note : the colNames are specified along with a familyName and they are 
seperated by a ':'
+ * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier 
name
+ */
+// TODO : Implement ProjectableTableSource?
+public class HBaseTableSource implements BatchTableSource {
--- End diff --

Am not sure.. For now I think we will implement BatchTableSource only and 
later implement StreamTableSource? Is there any significant design expectation 
for a source to be StreamTableSource?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97934428
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map<String, List<Pair<String, TypeInformation>>> 
familyMap =
+   new HashMap<String, List<Pair<String, TypeInformation>>>();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
--- End diff --

Ok. Makes sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97934446
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map<String, List<Pair<String, TypeInformation>>> 
familyMap =
+   new HashMap<String, List<Pair<String, TypeInformation>>>();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
+   private static Class[] CLASS_TYPES = {
+   Integer.class, Short.class, Float.class, Long.class, 
String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, 
BigDecimal.class, Date.class, byte[].class
+   };
+   /**
+* Allows specifying the family and qualifier name along with the data 
type of the qualifier for an HBase table
+*
+* @param familythe family name
+* @param qualifier the qualifier name
+* @param clazz the data type of the qualifier
+*/
+   public void addColumn(String family, String qualifier, Class clazz) {
+   Preconditions.checkNotNull(family, "family name");
+   Preconditions.checkNotNull(family, "qualifier name");
--- End diff --

Good catch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97934406
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map<String, List<Pair<String, TypeInformation>>> 
familyMap =
+   new HashMap<String, List<Pair<String, TypeInformation>>>();
--- End diff --

Ok. In our other projects we used to qualify the generic on both the sides. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
@fhueske , @tonycox , @wuchong - FYI.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
Updated the code with the comments and have pushed again. I think I have 
addressed all the comments here. Feedback/comments welcome. I also found that 
it is better to use the TableInputSplit to specify the start and end row so 
that the scan is anyway restricted to the given range.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97738061
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map<String, List> familyMap =
+   new HashMap<String, List>();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
+   private static Class[] CLASS_TYPES = {
+   Integer.class, Short.class, Float.class, Long.class, 
String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, 
BigDecimal.class, Date.class
+   };
+   private static byte[] EMPTY_BYTE_ARRAY = new byte[0];
+   public void addColumns(String family, String qualifier, 
TypeInformation type) {
+   Preconditions.checkNotNull(family, "family name");
+   Preconditions.checkNotNull(family, "qualifier name");
+   Preconditions.checkNotNull(type, "type name");
+   List list = this.familyMap.get(family);
+   if (list == null) {
+   list = new ArrayList();
+   }
+   boolean found = false;
+   for(Class classType : CLASS_TYPES) {
+   if(classType == type.getTypeClass()) {
+   found = true;
+   break;
+   }
+   }
+   if(!found) {
+   // by default it will be byte[] type only
+   type = BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO;
--- End diff --

Ok I will add byte[].class into the CLASS_TYPES. And anything other than 
the ones in CLASS_TYPES we can throw an exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-25 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97736554
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map<String, List> familyMap =
+   new HashMap<String, List>();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
+   private static Class[] CLASS_TYPES = {
+   Integer.class, Short.class, Float.class, Long.class, 
String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, 
BigDecimal.class, Date.class
+   };
+   private static byte[] EMPTY_BYTE_ARRAY = new byte[0];
+   public void addColumns(String family, String qualifier, 
TypeInformation type) {
+   Preconditions.checkNotNull(family, "family name");
+   Preconditions.checkNotNull(family, "qualifier name");
+   Preconditions.checkNotNull(type, "type name");
+   List list = this.familyMap.get(family);
+   if (list == null) {
+   list = new ArrayList();
+   }
+   boolean found = false;
+   for(Class classType : CLASS_TYPES) {
+   if(classType == type.getTypeClass()) {
+   found = true;
+   break;
+   }
+   }
+   if(!found) {
+   // by default it will be byte[] type only
+   type = BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO;
+   }
+   list.add(new Pair(qualifier, type));
+   familyMap.put(family, list);
+   }
+
+   public Map<String, List> getFamilyMap() {
+   return this.familyMap;
+   }
+
+   public Object deserialize(byte[] value, TypeInformation typeInfo) {
+   if (typeInfo.isBasicType()) {
+   if (typeInfo.getTypeClass() == Integer.class) {
+   return Bytes.toInt(value);
+   } else if (typeInfo.getTypeClass() == Short.class) {
+   return Bytes.toShort(value);
+   } else if (typeInfo.getTypeClass() == Float.class) {
+   return Bytes.toFloat(value);
+   } else if (typeInfo.getTypeClass() == Long.class) {
+   return Bytes.toLong(value);
+   } else if (typeInfo.getTypeClass() == String.class) {
+   return Bytes.toString(value);
+   } else if (typeInfo.getTypeClass() == Byte.class) {
+   return value[0];
+   } else if (typeInfo.getTypeClass() == Boolean.class) {
+   return Bytes.toBoolean(value);
+   } else if (typeInfo.getTypeClass() == Double.class) {
+   return Bytes.toDouble(value);
+   } else if (typeInfo.getTypeClass() == BigInteger.class) 
{
+   

[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-24 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97709565
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
+ */
+public class HBaseTableSourceInputFormat extends TableInputFormat 
implements ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
+   private String tableName;
+   private transient Connection conn;
+   private transient org.apache.hadoop.conf.Configuration conf;
+   private HBaseTableSchema schema;
+
+   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, HBaseTableSchema schema) {
+   this.tableName = tableName;
+   this.conf = conf;
+   this.schema = schema;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   LOG.info("Initializing HBaseConfiguration");
+   connectToTable();
+   if(table != null) {
+   scan = getScanner();
+   }
+   }
+
+   @Override
+   protected Scan getScanner() {
+   // TODO : Pass 'rowkey'. For this we need FilterableTableSource
+   Scan scan = new Scan();
+   Map<String, List> familyMap = schema.getFamilyMap();
+   for(String family : familyMap.keySet()) {
+   // select only the fields in the 'selectedFields'
+   List colDetails = familyMap.get(family);
+   for(Pair<String, TypeInformation> pair : colDetails) 
{
+   scan.addColumn(Bytes.toBytes(family), 
Bytes.toBytes(pair.getFirst()));
+   }
+   }
+   return scan;
+   }
+
+   @Override
+   public String getTableName() {
+   return tableName;
+   }
+
+   @Override
+   protected Row mapResultToTuple(Result res) {
+   List values = new ArrayList();
+   int i = 0;
+   Map<String, List> familyMap = schema.getFamilyMap();
+   Row[] rows = new Row[familyMap.size()];
+   for(String family : familyMap.keySet()) {
+   List colDetails = familyMap.get(family);
+   for(Pair<String, TypeInformation> pair : colDetails) 
{
+   byte[] value = 
res.getValue(Bytes.toBytes(family), Bytes.toBytes(pair.getFirst()));
+   if(value != null) {
+

[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-24 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97709590
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -19,99 +19,113 @@
 package org.apache.flink.addons.hbase;
 
 import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
-import org.apache.flink.api.common.io.RichInputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.types.Row;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.HRegionLocator;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
+import java.util.Map;
 
 /**
  * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
  */
-public class HBaseTableSourceInputFormat extends RichInputFormat<Row, 
TableInputSplit> implements ResultTypeQueryable {
+public class HBaseTableSourceInputFormat extends TableInputFormat 
implements ResultTypeQueryable {
 
private static final long serialVersionUID = 1L;
 
private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
private String tableName;
-   private TypeInformation[] fieldTypeInfos;
-   private String[] fieldNames;
-   private transient Table table;
-   private transient Scan scan;
private transient Connection conn;
-   private ResultScanner resultScanner = null;
-
-   private byte[] lastRow;
-   private int scannedRows;
-   private boolean endReached = false;
-   private org.apache.hadoop.conf.Configuration conf;
-   private static final String COLON = ":";
+   private transient org.apache.hadoop.conf.Configuration conf;
+   private HBaseTableSchema schema;
 
-   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) {
-   this.conf = conf;
+   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, HBaseTableSchema schema) {
this.tableName = tableName;
-   this.fieldNames = fieldNames;
-   this.fieldTypeInfos = fieldTypeInfos;
+   this.conf = conf;
+   this.schema = schema;
}
 
@Override
public void configure(Configuration parameters) {
LOG.info("Initializing HBaseConfiguration");
connectToTable();
if(table != null) {
-   scan = createScanner();
+   scan = getScanner();
}
}
 
-   private Scan createScanner() {
+   @Override
+   protected Scan getScanner() {
+   // TODO : Pass 'rowkey'. For this we need FilterableTableSource
Scan scan = new Scan();
-   for(String field : fieldNames) {
+   Map<String, List> familyMap = schema.getFamilyMap();
+   for(String family : familyMap.keySet()) {
// select only the fields in the 'selectedFields'
-   String[] famCol = field.split(COLON);
-   scan.addColumn(Bytes.toBytes(famCol[0]), 
Bytes.toBytes(famCol[1]));
+   List colDetails = familyMap.get(family);
+   for(Pair<String,

[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-24 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97709469
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map<String, List> familyMap =
+   new HashMap<String, List>();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
+   private static Class[] CLASS_TYPES = {
+   Integer.class, Short.class, Float.class, Long.class, 
String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, 
BigDecimal.class, Date.class
+   };
+   private static byte[] EMPTY_BYTE_ARRAY = new byte[0];
+   public void addColumns(String family, String qualifier, 
TypeInformation type) {
--- End diff --

Ok. Let me check that. So if we pass Class there we could wrap it with the 
corresponding TypeInformation for our internal usage?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-24 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97709482
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map<String, List> familyMap =
+   new HashMap<String, List>();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
+   private static Class[] CLASS_TYPES = {
+   Integer.class, Short.class, Float.class, Long.class, 
String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, 
BigDecimal.class, Date.class
+   };
+   private static byte[] EMPTY_BYTE_ARRAY = new byte[0];
+   public void addColumns(String family, String qualifier, 
TypeInformation type) {
+   Preconditions.checkNotNull(family, "family name");
+   Preconditions.checkNotNull(family, "qualifier name");
+   Preconditions.checkNotNull(type, "type name");
+   List list = this.familyMap.get(family);
+   if (list == null) {
+   list = new ArrayList();
+   }
+   boolean found = false;
+   for(Class classType : CLASS_TYPES) {
+   if(classType == type.getTypeClass()) {
+   found = true;
+   break;
+   }
+   }
+   if(!found) {
+   // by default it will be byte[] type only
+   type = BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO;
--- End diff --

Ok. Got it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-24 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97709518
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 ---
@@ -22,54 +22,63 @@
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.sources.BatchTableSource;
-import org.apache.flink.table.sources.ProjectableTableSource;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Creates a table source that helps to scan data from an hbase table
  *
  * Note : the colNames are specified along with a familyName and they are 
seperated by a ':'
  * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier 
name
  */
-public class HBaseTableSource implements BatchTableSource, 
ProjectableTableSource {
+// TODO : Implement ProjectableTableSource?
+public class HBaseTableSource implements BatchTableSource {
 
private Configuration conf;
private String tableName;
-   private byte[] rowKey;
-   private String[] colNames;
-   private TypeInformation[] colTypes;
+   private HBaseTableSchema schema;
+   private String[] famNames;
 
-   public HBaseTableSource(Configuration conf, String tableName, byte[] 
rowKey, String[] colNames,
-   TypeInformation[] 
colTypes) {
+   public HBaseTableSource(Configuration conf, String tableName, 
HBaseTableSchema schema) {
this.conf = conf;
this.tableName = Preconditions.checkNotNull(tableName, "Table  
name");
-   this.rowKey = Preconditions.checkNotNull(rowKey, "Rowkey");
-   this.colNames = Preconditions.checkNotNull(colNames, "Field 
names");
-   this.colTypes = Preconditions.checkNotNull(colTypes, "Field 
types");
+   this.schema = Preconditions.checkNotNull(schema, "Schema");
+   Map<String, List> familyMap = schema.getFamilyMap();
+   famNames = familyMap.keySet().toArray(new 
String[familyMap.size()]);
}
 
@Override
public TypeInformation getReturnType() {
-   return new RowTypeInfo(colTypes);
-   }
-
-   @Override
-   public DataSet getDataSet(ExecutionEnvironment execEnv) {
-   return execEnv.createInput(new 
HBaseTableSourceInputFormat(conf, tableName, colNames, colTypes), 
getReturnType());
-   }
+   // split the fieldNames
+   Map<String, List> famMap = schema.getFamilyMap();
 
-   @Override
-   public ProjectableTableSource projectFields(int[] fields) {
-   String[] newColNames = new String[fields.length];
-   TypeInformation[] newColTypes =  new 
TypeInformation[fields.length];
+   List qualNames = new ArrayList();
--- End diff --

I will look into this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-24 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97709503
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 ---
@@ -22,54 +22,63 @@
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.sources.BatchTableSource;
-import org.apache.flink.table.sources.ProjectableTableSource;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Creates a table source that helps to scan data from an hbase table
  *
  * Note : the colNames are specified along with a familyName and they are 
seperated by a ':'
  * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier 
name
  */
-public class HBaseTableSource implements BatchTableSource, 
ProjectableTableSource {
+// TODO : Implement ProjectableTableSource?
+public class HBaseTableSource implements BatchTableSource {
 
private Configuration conf;
private String tableName;
-   private byte[] rowKey;
-   private String[] colNames;
-   private TypeInformation[] colTypes;
+   private HBaseTableSchema schema;
+   private String[] famNames;
 
-   public HBaseTableSource(Configuration conf, String tableName, byte[] 
rowKey, String[] colNames,
-   TypeInformation[] 
colTypes) {
+   public HBaseTableSource(Configuration conf, String tableName, 
HBaseTableSchema schema) {
this.conf = conf;
this.tableName = Preconditions.checkNotNull(tableName, "Table  
name");
-   this.rowKey = Preconditions.checkNotNull(rowKey, "Rowkey");
-   this.colNames = Preconditions.checkNotNull(colNames, "Field 
names");
-   this.colTypes = Preconditions.checkNotNull(colTypes, "Field 
types");
+   this.schema = Preconditions.checkNotNull(schema, "Schema");
+   Map<String, List> familyMap = schema.getFamilyMap();
+   famNames = familyMap.keySet().toArray(new 
String[familyMap.size()]);
--- End diff --

Fine. Will do it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-24 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97709488
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map<String, List> familyMap =
+   new HashMap<String, List>();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
+   private static Class[] CLASS_TYPES = {
+   Integer.class, Short.class, Float.class, Long.class, 
String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, 
BigDecimal.class, Date.class
+   };
+   private static byte[] EMPTY_BYTE_ARRAY = new byte[0];
+   public void addColumns(String family, String qualifier, 
TypeInformation type) {
+   Preconditions.checkNotNull(family, "family name");
+   Preconditions.checkNotNull(family, "qualifier name");
+   Preconditions.checkNotNull(type, "type name");
+   List list = this.familyMap.get(family);
+   if (list == null) {
+   list = new ArrayList();
+   }
+   boolean found = false;
+   for(Class classType : CLASS_TYPES) {
+   if(classType == type.getTypeClass()) {
+   found = true;
+   break;
+   }
+   }
+   if(!found) {
+   // by default it will be byte[] type only
+   type = BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO;
+   }
+   list.add(new Pair(qualifier, type));
--- End diff --

Ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-01-24 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
@fhueske , @tonycox , @wuchong 
I have updated the PR based on all the feedbacks here. Now you could see 
that we now support CompoisteRowType and we are able to specify multiple column 
families along with the qualifier names.
We are able to retrieve the result by doing a full scan.
This is not efficient and we need to specify start and end rows. I think 
that can be done after FilterableTableSource is done.
I have added test cases that shows single column family and double column 
family. 
For now if the TypeInformation is not known we use plain byte[] type only. 
That happens at the validation state itself. But one main concern from my side 
is how to present the 'NULL' means we specify a column with a type but there is 
no data for that column. For now I have handled by returning the Int, Float, 
Long - Min_values. But that may not be right I believe. Feedback and 
suggestions welcome.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-01-23 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
Good news is that with the help of this Composite RowType and modifying my 
code accordingly and debugging things I could get the basic thing to work. Now 
I will work on stitching things together and submitting a PR with updated 
changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-01-23 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
Thanks for all the inputs here. I have been trying to make my existing code 
work with the composite RowTypeInfo. Once that is done I will try to introduce 
the HBaseTableSchema.
Also I would like to work on FLINK-3849 (FilterableTableSource) after this 
first version of HBaseTableSource is accepted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-23 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97304540
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Creates a table source that helps to scan data from an hbase table
+ *
+ * Note : the colNames are specified along with a familyName and they are 
seperated by a ':'
+ * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier 
name
+ */
+public class HBaseTableSource implements BatchTableSource, 
ProjectableTableSource {
+
+   private Configuration conf;
+   private String tableName;
+   private byte[] rowKey;
+   private String[] colNames;
+   private TypeInformation[] colTypes;
+
+   public HBaseTableSource(Configuration conf, String tableName, byte[] 
rowKey, String[] colNames,
+   TypeInformation[] 
colTypes) {
+   this.conf = conf;
+   this.tableName = Preconditions.checkNotNull(tableName, "Table  
name");
+   this.rowKey = Preconditions.checkNotNull(rowKey, "Rowkey");
--- End diff --

I tried this customization but still if I pass something like f1.q1 it was 
throwing a validation error which was fine after I used it as suggested by 
@tonycox .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-20 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97055300
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Creates a table source that helps to scan data from an hbase table
+ *
+ * Note : the colNames are specified along with a familyName and they are 
seperated by a ':'
+ * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier 
name
+ */
+public class HBaseTableSource implements BatchTableSource, 
ProjectableTableSource {
+
+   private Configuration conf;
+   private String tableName;
+   private byte[] rowKey;
+   private String[] colNames;
+   private TypeInformation[] colTypes;
+
+   public HBaseTableSource(Configuration conf, String tableName, byte[] 
rowKey, String[] colNames,
+   TypeInformation[] 
colTypes) {
+   this.conf = conf;
+   this.tableName = Preconditions.checkNotNull(tableName, "Table  
name");
+   this.rowKey = Preconditions.checkNotNull(rowKey, "Rowkey");
--- End diff --

`new RowTypeInfo(
new TypeInformation[]{
new RowTypeInfo(
new TypeInformation[]{
BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO},
new String[]{"name", "age"})
},
new String[]{"person"}
);`
I did the above change for the HBaseTableSource's return type and then made 
a simple query
`   tableEnv.registerTableSource("test", hbaseTable);
Table result = tableEnv
.sql("SELECT f1.q1, f1.q2, f1.q3 FROM test");`
It throws sql validation error and considers f1 as table name and says 
tableName 'f1' not found.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-19 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97015379
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.HRegionLocator;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
+ */
+public class HBaseTableSourceInputFormat extends RichInputFormat<Row, 
TableInputSplit> implements ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
+   private String tableName;
+   private TypeInformation[] fieldTypeInfos;
+   private String[] fieldNames;
+   private transient Table table;
+   private transient Scan scan;
+   private transient Connection conn;
+   private ResultScanner resultScanner = null;
+
+   private byte[] lastRow;
+   private int scannedRows;
+   private boolean endReached = false;
+   private org.apache.hadoop.conf.Configuration conf;
+   private static final String COLON = ":";
+
+   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) {
+   this.conf = conf;
+   this.tableName = tableName;
+   this.fieldNames = fieldNames;
+   this.fieldTypeInfos = fieldTypeInfos;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   LOG.info("Initializing HBaseConfiguration");
+   connectToTable();
+   if(table != null) {
+   scan = createScanner();
+   }
+   }
+
+   private Scan createScanner() {
+   Scan scan = new Scan();
+   for(String field : fieldNames) {
+   // select only the fields in the 'selectedFields'
+   String[] famCol = field.split(COLON);
+   scan.addColumn(Bytes.toBytes(famCol[0]), 
Bytes.toBytes(famCol[1]));
+   }
+   return scan;
+   }
+
+   private void connectToTable() {
+   //use files found in the classpath
+   if(this.con

[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-19 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97015321
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Creates a table source that helps to scan data from an hbase table
+ *
+ * Note : the colNames are specified along with a familyName and they are 
seperated by a ':'
+ * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier 
name
+ */
+public class HBaseTableSource implements BatchTableSource, 
ProjectableTableSource {
+
+   private Configuration conf;
+   private String tableName;
+   private byte[] rowKey;
+   private String[] colNames;
+   private TypeInformation[] colTypes;
+
+   public HBaseTableSource(Configuration conf, String tableName, byte[] 
rowKey, String[] colNames,
+   TypeInformation[] 
colTypes) {
+   this.conf = conf;
+   this.tableName = Preconditions.checkNotNull(tableName, "Table  
name");
+   this.rowKey = Preconditions.checkNotNull(rowKey, "Rowkey");
--- End diff --

Yes. That is true but do we always want full table scan? Actually in HBase 
it is better we specify start and end key. So how do we specify that? I have 
not used this rowKey now but I thought it is better to be used?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-19 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97015267
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.HRegionLocator;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
+ */
+public class HBaseTableSourceInputFormat extends RichInputFormat<Row, 
TableInputSplit> implements ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
+   private String tableName;
+   private TypeInformation[] fieldTypeInfos;
+   private String[] fieldNames;
+   private transient Table table;
+   private transient Scan scan;
+   private transient Connection conn;
+   private ResultScanner resultScanner = null;
+
+   private byte[] lastRow;
+   private int scannedRows;
+   private boolean endReached = false;
+   private org.apache.hadoop.conf.Configuration conf;
+   private static final String COLON = ":";
+
+   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) {
+   this.conf = conf;
+   this.tableName = tableName;
+   this.fieldNames = fieldNames;
+   this.fieldTypeInfos = fieldTypeInfos;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   LOG.info("Initializing HBaseConfiguration");
+   connectToTable();
+   if(table != null) {
+   scan = createScanner();
+   }
+   }
+
+   private Scan createScanner() {
+   Scan scan = new Scan();
+   for(String field : fieldNames) {
+   // select only the fields in the 'selectedFields'
+   String[] famCol = field.split(COLON);
+   scan.addColumn(Bytes.toBytes(famCol[0]), 
Bytes.toBytes(famCol[1]));
+   }
+   return scan;
+   }
+
+   private void connectToTable() {
+   //use files found in the classpath
+   if(this.con

[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-01-18 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
Thanks for the ping here @tonycox . 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-18 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r96790253
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.HRegionLocator;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
+ */
+public class HBaseTableSourceInputFormat extends RichInputFormat<Row, 
TableInputSplit> implements ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
+   private String tableName;
+   private TypeInformation[] fieldTypeInfos;
+   private String[] fieldNames;
+   private transient Table table;
+   private transient Scan scan;
+   private transient Connection conn;
+   private ResultScanner resultScanner = null;
+
+   private byte[] lastRow;
+   private int scannedRows;
+   private boolean endReached = false;
+   private org.apache.hadoop.conf.Configuration conf;
+   private static final String COLON = ":";
+
+   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) {
+   this.conf = conf;
+   this.tableName = tableName;
+   this.fieldNames = fieldNames;
+   this.fieldTypeInfos = fieldTypeInfos;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   LOG.info("Initializing HBaseConfiguration");
+   connectToTable();
+   if(table != null) {
+   scan = createScanner();
+   }
+   }
+
+   private Scan createScanner() {
+   Scan scan = new Scan();
+   for(String field : fieldNames) {
+   // select only the fields in the 'selectedFields'
+   String[] famCol = field.split(COLON);
+   scan.addColumn(Bytes.toBytes(famCol[0]), 
Bytes.toBytes(famCol[1]));
+   }
+   return scan;
+   }
+
+   private void connectToTable() {
+   //use files found in the classpath
+   if(this.con

[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-18 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r96790136
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.HRegionLocator;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
+ */
+public class HBaseTableSourceInputFormat extends RichInputFormat<Row, 
TableInputSplit> implements ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
+   private String tableName;
+   private TypeInformation[] fieldTypeInfos;
+   private String[] fieldNames;
+   private transient Table table;
+   private transient Scan scan;
+   private transient Connection conn;
+   private ResultScanner resultScanner = null;
+
+   private byte[] lastRow;
+   private int scannedRows;
+   private boolean endReached = false;
+   private org.apache.hadoop.conf.Configuration conf;
+   private static final String COLON = ":";
+
+   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) {
+   this.conf = conf;
+   this.tableName = tableName;
+   this.fieldNames = fieldNames;
+   this.fieldTypeInfos = fieldTypeInfos;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   LOG.info("Initializing HBaseConfiguration");
+   connectToTable();
+   if(table != null) {
+   scan = createScanner();
+   }
+   }
+
+   private Scan createScanner() {
+   Scan scan = new Scan();
+   for(String field : fieldNames) {
+   // select only the fields in the 'selectedFields'
+   String[] famCol = field.split(COLON);
+   scan.addColumn(Bytes.toBytes(famCol[0]), 
Bytes.toBytes(famCol[1]));
+   }
+   return scan;
+   }
+
+   private void connectToTable() {
+   //use files found in the classpath
+   if(this.con

  1   2   3   4   >