[GitHub] flink issue #3760: FLINK-5752 Support push down projections for HBaseTableSo...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3760 @fhueske Do you have some time to check this now considering Flink - 1.3 release is out? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2332 So I think at a first level let us have put/delete mutations alone for Streaming ? Since am not aware of how flink users are currently interacting with HBase not sure on what HBase ops should be supported. From your case it seems puts/deletes are the common cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2332 @nragon I agree. But your use case does it have increments/appends? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2332 I can take this up and come up with a design doc. Reading thro the comments here and the final decision points I think only puts/deletes can be considered idempotent. But increments/decrements cannot be considered to be idempotent. We may need to two types of sink then one sink which supports the puts/deletes and the other one where we need to support non-idempotent ops. Coming to @nielsbasjes issue of not able to flush the buffered mutator - should you always call bufferedmutator#flush() on every checkpoint call? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2332 @delding Are you still working on this? @nragon Let me know how can be of help here? I can work on this PR too since I have some context on the existing PR though it is stale. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3881: FLINK-6284 Incorrect sorting of completed checkpoi...
Github user ramkrish86 closed the pull request at: https://github.com/apache/flink/pull/3881 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3881: FLINK-6284 Incorrect sorting of completed checkpoints in ...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3881 I won't be available for next 2 to 3 hours. So feel free to decide based on your convenience in case you need to make the RC candidate for 1.3 release. I am sorry that I could not make an initial commit that took care of things properly, should have been more careful. Thanks for the opportunity. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3881: FLINK-6284 Incorrect sorting of completed checkpoints in ...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3881 @tillrohrmann Thanks for the new PR. I just executed your change with 101, 99 , 100 as the checkpoint order. In this case 100 should be the latest one though the actual ids are not sorted. But with your change and my earlier commit it will always sort 99, 100, 101. Can you take a look at my latest commit, that is based on czxid (as per your suggestion) and I think that makes sense. What ever be the actual id, in the zookeeper what was created recently will be the latest checkpoint. But am not very sure if the checkpointId will really be added in a non-sorted way and can 100 be the latest one (though 101 was also there). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3881: FLINK-6284 Incorrect sorting of completed checkpoi...
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3881#discussion_r116211254 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -346,17 +346,20 @@ public int exists(String pathInZooKeeper) throws Exception { } else { // Initial cVersion (number of changes to the children of this node) int initialCVersion = stat.getCversion(); - - List children = ZKPaths.getSortedChildren( - client.getZookeeperClient().getZooKeeper(), - ZKPaths.fixForNamespace(client.getNamespace(), "/")); - - for (String path : children) { - path = "/" + path; + List childrenInStr = + client.getZookeeperClient().getZooKeeper(). + getChildren(ZKPaths.fixForNamespace(client.getNamespace(), "/"), false); + List children = new ArrayList(childrenInStr.size()); + for(String childNode : childrenInStr) { + children.add(new Long(childNode)); --- End diff -- Ok. I see. I am not sure on this MesosWorker. Using cxid am not sure if we have an API. If so we can direclty use it. Will be back. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3881: FLINK-6284 Incorrect sorting of completed checkpoi...
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3881#discussion_r116211132 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -346,17 +346,20 @@ public int exists(String pathInZooKeeper) throws Exception { } else { // Initial cVersion (number of changes to the children of this node) int initialCVersion = stat.getCversion(); - - List children = ZKPaths.getSortedChildren( - client.getZookeeperClient().getZooKeeper(), - ZKPaths.fixForNamespace(client.getNamespace(), "/")); - - for (String path : children) { - path = "/" + path; + List childrenInStr = + client.getZookeeperClient().getZooKeeper(). + getChildren(ZKPaths.fixForNamespace(client.getNamespace(), "/"), false); + List children = new ArrayList(childrenInStr.size()); + for(String childNode : childrenInStr) { + children.add(new Long(childNode)); + } --- End diff -- Here again. It is my bad. I lost my previous changes becauseo f the compile issue. So lost this. I have made a new push already for this sort thing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3881: FLINK-6284 Incorrect sorting of completed checkpoi...
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3881#discussion_r116211032 --- Diff: pom.xml --- @@ -101,7 +101,8 @@ under the License. 0.7.4 5.0.4 3.4.6 - 2.12.0 + 2.11.0 --- End diff -- Oh..My environment was not able to get 2.12.0. So to make things compile I included this change. Will revert it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3881: FLINK-6284 Incorrect sorting of completed checkpoi...
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3881#discussion_r116188234 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -346,11 +346,7 @@ public int exists(String pathInZooKeeper) throws Exception { } else { // Initial cVersion (number of changes to the children of this node) int initialCVersion = stat.getCversion(); - - List children = ZKPaths.getSortedChildren( - client.getZookeeperClient().getZooKeeper(), - ZKPaths.fixForNamespace(client.getNamespace(), "/")); - + List children = client.getZookeeperClient().getZooKeeper().getChildren(ZKPaths.fixForNamespace(client.getNamespace(), "/"), false); --- End diff -- Let me do it my older way. I had a patch but I thought this is better. I checked the javadoc of the ZKPaths only. I will push my initial version of the patch only then, where convert the List to List and then use that as the sorted one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3881: FLINK-6284 Incorrect sorting of completed checkpoi...
GitHub user ramkrish86 opened a pull request: https://github.com/apache/flink/pull/3881 FLINK-6284 Incorrect sorting of completed checkpoints in ZooKeeperCompletedCheckpointStore ZooKeeperCompletedCheckpointStore Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed Making use of the Zookeeper's getChildren() API directly so that it just creates a list in the sequence order. If we go with the ZKPaths API then we need to do some sorting by converting the List to List. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ramkrish86/flink FLINK-6284 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3881.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3881 commit 33bf37a2d706af6c8eb6cbe9d58aa3ac9d1f03e0 Author: Ramkrishna <ramkrishna.s.vasude...@intel.com> Date: 2017-05-12T08:18:16Z FLINK-6284 Incorrect sorting of completed checkpoints in ZooKeeperCompletedCheckpointStore --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3760: FLINK-5752 Support push down projections for HBaseTableSo...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3760 @fhueske Thanks for the update. Got it. But I would like to say that if there any issues/JIRA that I could be of help for the 1.3 release fork, I would happy to help. Pls point me to those you think I can be of help, I can have a look and commit to what ever I can spend time on. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3760: FLINK-5752 Support push down projections for HBaseTableSo...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3760 I checked the recent failure. `Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 20.417 sec <<< FAILURE! - in org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest` This failure seems unrelated with the changes in this patch. @tonycox , @fhueske Just a gentle ping. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3760: FLINK-5752 Support push down projections for HBaseTableSo...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3760 @tonycox Thanks for the comment? Just curious. Any reason for the repush - is it for checking the test failures again? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3760: FLINK-5752 Support push down projections for HBaseTableSo...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3760 The failures seems not directly related. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3760: FLINK-5752 Support push down projections for HBaseTableSo...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3760 @fhueske , @tonycox Can you have a look at this PR? Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3760: FLINK-5752 Support push down projections for HBase...
GitHub user ramkrish86 opened a pull request: https://github.com/apache/flink/pull/3760 FLINK-5752 Support push down projections for HBaseTableSource (Ram) Ran mvn clean verify -DskipTests In this patch `Arrays.sort(nestedFields[i]);` Am doing this before doing addColumns for the new projected table source, because here the cols appears in a reverse sorted way and when we apply that for the new projected table source it creates an assertion error while creating the new calcite program with the projected cols `assert expr.getType().getFieldList().get(field.getIndex()) == field;` So doing this sort helps in correcting those issues and the tests run fine. But this nestedFields that is being passed to the projectNestedFields() API is created by `def getProjectedFields: Array[Array[String]] ` under RexProgramExtractor. Trying to understand what it does. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ramkrish86/flink FLINK-5752 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3760.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3760 commit 1885b0c9afc7aa49747987b0508ba82002a1bc8d Author: Ramkrishna <ramkrishna.s.vasude...@intel.com> Date: 2017-04-24T10:23:08Z FLINK-5752 Support push down projections for HBaseTableSource (Ram) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGr...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3334 @StephanEwen No problem. I appreciate your time and efforts. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGr...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3334 @StephanEwen I saw in another JIRA one of your comment where you talked about refactoring CheckPointcoordinator and Pendingcheckpoint. So you woud this PR to wait till then? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3478: Flink 4816 Executions failed from "DEPLOYING" shou...
GitHub user ramkrish86 opened a pull request: https://github.com/apache/flink/pull/3478 Flink 4816 Executions failed from "DEPLOYING" should retain restored checkpoint information Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed Just does what ever the description says. Feedback/suggestions are welcome. Ran 'mvn clean verify -DskipTests' no static comment failures. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ramkrish86/flink FLINK-4816 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3478.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3478 commit 8cc66b918628fc1c5febb9408829c73e2ad59c46 Author: Ramkrishna <ramkrishna.s.vasude...@intel.com> Date: 2017-03-06T11:25:37Z FLINK-4816 Executions failed from "DEPLOYING" should retain restored checkpoint information commit 5a1bb31668dc81224c2aea870213a7cba1be3352 Author: Ramkrishna <ramkrishna.s.vasude...@intel.com> Date: 2017-03-06T11:41:58Z Add lock to getRestoredCheckpointID --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGr...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3334 Just updated and did a force push to avoid the merge commit. Now things are fine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGr...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3334 Ping for reviews here!!! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGr...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3334 @StephanEwen , @wenlong88 , @shixiaogang Pls have a look at the latest push. Now I am tracking the failures in the checkpointing and incrementing a new counter based on it. Added test cases also. I have not changed the constructors of the affected class because it touches many files. I can update it based on the feedback of the latest PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGr...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3334 I thinkI got a better way to trck this. Will update the PR sooner. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGr...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3334 Thanks for the input. I read the code. There are two ways a checkpoint fails (as per my code understanding). If for some reason checkpointing cannot be performed we send DeclineCheckpoint message. That is handled by the Checkpointcoordinator. Another is if there is an external error in checkpointing and in that case we call failExternally. Which transitions the state to FAILED and closes all the watchdog, and cancels the invokable also. Now is the intent to track how many times this happens and if so track such occurences of failure and then fail the execution graph? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGr...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3334 I think I got what you are saying here. Since Execution#triggerCheckpoint is the actual checkpoint call and currently we don't track it if there is a failure. So your point is it is better know if there was a failure in actual checkpoint triggering at the Task level and then count that as a failure. Am I right @wenlong88 ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGr...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3334 @wenlong88 Can you tell more when you say checkpointing failure and trigger failure? I think if you are saying about tracking the number of times the execution fails after restoring from a checkpoint I think FLINK-4815 is trying to focus that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3334: FLINK-4810 Checkpoint Coordinator should fail Exec...
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3334#discussion_r103638771 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -537,12 +562,27 @@ else if (!props.forceCheckpoint()) { if (!checkpoint.isDiscarded()) { checkpoint.abortError(new Exception("Failed to trigger checkpoint")); } + if(numUnsuccessful > maxUnsuccessfulCheckpoints) { + return failExecution(executions); + } return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION); } } // end trigger lock } + private CheckpointTriggerResult failExecution(Execution[] executions) { + if (currentPeriodicTrigger != null) { + currentPeriodicTrigger.cancel(); + currentPeriodicTrigger = null; + } + for (Execution execution : executions) { + // fail the graph + execution.fail(new Throwable("The number of max unsuccessful checkpoints attempts exhausted")); --- End diff -- I verified the code once again. There is no reference to ExecutionGraph in Checkpointcoordinator and also calling fail on the current Execution actually triggers the restart flow to happen. Execution#fail()->Marks state to FAILED->vertex#executionFailed()->graph#jobVertexInFinalState(). So you think this way of failing won't work? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3334: FLINK-4810 Checkpoint Coordinator should fail Exec...
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3334#discussion_r103612421 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -537,12 +562,27 @@ else if (!props.forceCheckpoint()) { if (!checkpoint.isDiscarded()) { checkpoint.abortError(new Exception("Failed to trigger checkpoint")); } + if(numUnsuccessful > maxUnsuccessfulCheckpoints) { + return failExecution(executions); + } return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION); } } // end trigger lock } + private CheckpointTriggerResult failExecution(Execution[] executions) { + if (currentPeriodicTrigger != null) { + currentPeriodicTrigger.cancel(); + currentPeriodicTrigger = null; + } + for (Execution execution : executions) { + // fail the graph + execution.fail(new Throwable("The number of max unsuccessful checkpoints attempts exhausted")); --- End diff -- Ok sure. I will add tests for this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3334: FLINK-4810 Checkpoint Coordinator should fail Exec...
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3334#discussion_r103612320 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -121,6 +121,8 @@ /** The maximum number of checkpoints that may be in progress at the same time */ private final int maxConcurrentCheckpointAttempts; + /** The maximum number of unsuccessful checkpoints */ + private final int maxUnsuccessfulCheckpoints; --- End diff -- ok. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGr...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3334 @StephanEwen - Ping for initial reviews. Will work on it based on the feedback. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3334: FLINK-4810 Checkpoint Coordinator should fail Exec...
GitHub user ramkrish86 opened a pull request: https://github.com/apache/flink/pull/3334 FLINK-4810 Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints unsuccessful checkpoints Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed Ran mvn clean verify. Did not add test cases to know the first level feedback. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ramkrish86/flink FLINK-4810 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3334.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3334 commit 6e0fb38272e6bb59528065461c6ec6fdd43689ad Author: Ramkrishna <ramkrishna.s.vasude...@intel.com> Date: 2017-02-16T11:29:37Z FLINK-4810 Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 Thanks to all @fhueske , @tonycox and @wuchong for helping in getting this in. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 @fhueske - Are you fine with that pom change? If so we can get this in. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 I think that is not the only reason. Some where either these tests are creating more static objects or bigger objects that live for the life time of the JVM. May be this test exposes it. Actually this change in maxPermsize calls for a discussion on how the 'mvn' settings should be and what is the minimum size of heap required. Also in other projects the default jdk version is now moved to 8 atleast for the trunk version. I think similar thing can be done here. Thanks @tonycox . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 @fhueske - so are you ok with @tonycox suggestion of setting MAxPermSize for hbase module? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 Even jhat was not able to view the file as it had a problem in parsing the hprof file. So my question is if MaxPermSize is 128M - why does it work with jdk 8? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 I tried multiple ways to take the dump file from this mvn test command run. I get a hprof file which on opening in heap dump analyser throws EOF exception or NPE exception. @tonycox - were you able to get any heap dump? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 I tried to compare TableInputFormat and the new one. But the interesting part is HBaseInputformat does not fail in jdk 8 which was my default in my test env. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 Thanks to @tonycox for helping me in reproducing this error. Changing to JDK 7 creates this issue and it creates due to permGen space running out of memory. I don't have a soln for this. It runs with JDK 8 with no hassles. Any inputs here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 > Results : Tests run: 2, Failures: 0, Errors: 0, Skipped: 0 This is what I get as test result. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 It is CentOS Linux release 7.0.1406 (Core). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 I tried even that. The test runs fine for me. No OOME I get. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 `02/02/2017 06:19:20DataSink (collect())(6/32) switched to SCHEDULED 02/02/2017 06:19:20 DataSink (collect())(2/32) switched to SCHEDULED 02/02/2017 06:19:20 DataSink (collect())(8/32) switched to SCHEDULED 02/02/2017 06:19:20 DataSink (collect())(6/32) switched to DEPLOYING 02/02/2017 06:19:20 DataSink (collect())(8/32) switched to DEPLOYING 02/02/2017 06:19:20 DataSink (collect())(2/32) switched to DEPLOYING 02/02/2017 06:19:25 DataSink (collect())(1/32) switched to SCHEDULED 02/02/2017 06:19:25 DataSink (collect())(1/32) switched to DEPLOYING Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "JvmPauseMonitor" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "hconnection-0x2247c79b-shared--pool20-t1" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "JvmPauseMonitor" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "flink-akka.actor.default-dispatcher-3" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "IPC Server handler 1 on 34267" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "CHAIN DataSource (localhost:53456)" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "CHAIN DataSource (localhost:53456)" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "CHAIN DataSource (localhost:53456)" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "org.apache.hadoop.hdfs.PeerCache@28360f4a"` The same tests run cleanly in my linux box. Is there any place I can download the logs to see what could be the issue? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 > Will need to figure out what's the reason for that before I can merge the PR. I tried running those tests again in my linux box and all went through without any error. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 @fhueske - A gentle reminder !!! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 @fhueske - Please have a look at the javadoc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 > For now I'd suggest to keep the scope of the PR as it is right now. A bit more Java documentation on HBaseTableSource to explain how it is used would be great. We can implement the NestedFieldsProjectableTableSource and the changes to HBaseTableSource in a follow up issue. +1 for this. I can add some more javadoc to it. BTW am trying to checkout these Projections and using the ProjectabletableSource. Will be back on it. Thanks to every one for all the comments and feedback. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 bq.ProjectableTableSource works in scan process. Ya got it. I was just trying to relate with this HBase thing and could find that we try to read all cols and then do a flatMap and then return the required cols alone. Just read that PR to understand better. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 I think going through the PR for https://issues.apache.org/jira/browse/FLINK-3848 - I think we try to project only the required columns. Similarly we could do here also. So my and @tonycox 's suggestion of having a new way of ProjectableTableSource could help here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 ` Table result = tableEnv .sql("SELECT test1.f1.q1, test1.f2.q2 FROM test1 where test1.f1.q1 < 103");` I just tried this query and it works with or without ProjectableTableSource. So just wanted to know when does the projection come into place. I thought without Projection things of this sort may not work. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 > Regarding the HBaseTableSchema, we could also use it only internally and not expose it to the user. The HBaseTableSource would have a method addColumn() and forward the calls to its internal HBaseSchema. Have done this. I initially thought to do things in the construction time itself. Now added an addColumn() in hbaseTableSource. HBaseSchema becomes totally package private and no access to users. Regarding the flatSchema, generally in hbase only family is required and the qualifiers are just dynamic. But here for the sake of accessibility we expect the user to specify the column names. (trying to give a relational look). It is in case of projections is where we have some issues. In my opinion if we have a better API for Projection may be we could handle it better? The current nested way as you said is better in the sense that all columns of a family are grouped together. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 Just a general question `def projectFields(fields: Array[Int]): ProjectableTableSource[T]` Is it mandatory to only have int[] here. Can we have String[] that allows to specify specific names? May be even more generic could be to have a ColumnarTableProjectableSource which allows to specify family to column mapping some way? Ultimately its the table source that is going to do the mapping and create the projected table source. So that should be fine? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 Updated the PR fixing the comments. The comments were simple but the adding AbstractTableInputFormat and moving the code back and forth makes this one a bigger change. But internally they are just refactorings. The test cases passes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 To understand better > We could make flat schema an optional mode or implement it as a separate TableSource as well. and this one > This could be solved if we use a flat schema and encode the nesting as columnFamily$column Are you talking about using seperators for it? May be am not getting your concern here. Ya I agree that nested schema is better API but if we go with flat schema then maintaining the family to qualifier relation may not be easy. As you said a seperate TableSource where we define such things would be better. Regarding HBaseTableSchema I think that is better so that we could modify that class for better serialization and deserialization by adding more logic for different types of classes. Even when we go with flat schema I think this type of class would help us to maintain the logic of family to qualifier mapping? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r98828506 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * {@link InputFormat} subclass that wraps the access for HTables. Returns the result as {@link Row} + */ +public class HBaseTableSourceInputFormat extends TableInputFormat implements ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(HBaseTableSourceInputFormat.class); + private String tableName; + private transient Connection conn; + private transient org.apache.hadoop.conf.Configuration conf; + private HBaseTableSchema schema; + + public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration conf, String tableName, HBaseTableSchema schema) { + this.tableName = tableName; + this.conf = conf; + this.schema = schema; + } + + @Override + public void configure(Configuration parameters) { + LOG.info("Initializing HBaseConfiguration"); + connectToTable(); + if(table != null) { + scan = getScanner(); + } + } + + @Override + protected Scan getScanner() { + Scan scan = new Scan(); + for(String family : schema.getFamilyNames()) { --- End diff -- Actually in hbse every thing is lexographically sorted (with byte[]). But here since we represent using String the map that we use in HBaseTableSchema should be sorting this in the natural order. But the results that we retrieve will always be lexographically sorted and we try to retrieve the specific cell from the result using this > res.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)); --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r98828345 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * {@link InputFormat} subclass that wraps the access for HTables. Returns the result as {@link Row} + */ +public class HBaseTableSourceInputFormat extends TableInputFormat implements ResultTypeQueryable { --- End diff -- Here the Row - you mean the Type of generic that is passed to this class? Because the HBase table also has the concept of row which we are now not using it. So I feel the current name is better. What you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r98828283 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.table.sources.ProjectableTableSource; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.conf.Configuration; + +import java.util.Map; + +/** + * Creates a table source that helps to scan data from an hbase table + * + * Note : the colNames are specified along with a familyName and they are seperated by a ':' --- End diff -- True. Will remove. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 @tonycox I have addressed all your latest comments including making HBaseTableSource a ProjectableTableSource. @wuchong , @fhueske Are you guys fine with the latest updates. If so we can try closing this PR and further discussions of adding StreamingtableSource and supporting WHERE clauses can be done in subsequent PRs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r98487401 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Time; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.sql.Date; + +/** + * Helps to specify an HBase Table's schema + */ +public class HBaseTableSchema implements Serializable { + + // A Map with key as column family. + private final Map<String, List<Pair<String, TypeInformation>>> familyMap = --- End diff -- > I think PushProjectIntoBatchTableSourceScanRule is not good enough for nested data types. but we can project at least family columns now. So you mean we will project the column families. One thing to remember is that though we have two families - Eduction and Department. When we retrieve Department would come first and then Education. So what matters is how we retrieve the result. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r98486712 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Time; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.sql.Date; + +/** + * Helps to specify an HBase Table's schema + */ +public class HBaseTableSchema implements Serializable { + + // A Map with key as column family. + private final Map<String, List<Pair<String, TypeInformation>>> familyMap = + new HashMap<>(); + + // Allowed types. This may change. + // TODO : Check if the Date type should be the one in java.util or the one in java.sql + private static Class[] CLASS_TYPES = { + Integer.class, Short.class, Float.class, Long.class, String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, BigDecimal.class, Date.class, Time.class, byte[].class --- End diff -- Ok. I can do. I have not used much of these Immutblecollection. Infact in our project we removed the dependency on this. Hence went on with a default way. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r98486522 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Time; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.sql.Date; + +/** + * Helps to specify an HBase Table's schema + */ +public class HBaseTableSchema implements Serializable { + + // A Map with key as column family. + private final Map<String, List<Pair<String, TypeInformation>>> familyMap = --- End diff -- I just felt Map is not needed here. Anyway we have the Key as String. If you are so particular then I can change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 Am not sure how we will manage this. But one thing that can be done is that if invalid family name is added to the scan - HBase internally throws FamilyNotFoundException - so that we can track and report back. But am not sure how to track the schema. So every time he wants to scan two families - the user has to call HBaseTableSchema#addColumns() twice (with the required qualifiers). @tonycox Can you help me understand - when you say ' so we can add more maps that generate after familyMap has set'? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 For ProjectableTableSource I think I need some clarity. Because currently is based on int[] representing the fields. So am not sure how to map them in terms of qualifiers under a family. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 @fhueske , I have fixed the comments as per @wuchong . And he has said +1 after fixing them. Would like to see the PR and see if it is fine with you too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 > Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "RpcServer.reader=6,bindAddress=testing-docker-bb4f2e37-e79f-42a3-a9e9-4995e42c70ba,port=45919" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "CHAIN DataSource (at getDataSet(HBaseTableSource.java:63) (org.apache.flink.addons.hbase.HBaseTableSourceInputFormat)) -> FlatMap (select: (f1.q1 AS q1, f1.q2 AS q2, f1.q3 AS q3)) (6/32)" 01/27/2017 05:57:12 DataSink (collect())(13/32) switched to DEPLOYING Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "LeaseRenewer:travis@localhost:39289" 01/27/2017 05:57:14 DataSink (collect())(12/32) switched to SCHEDULED Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "CHAIN DataSource (at getDataSet(HBaseTableSource.java:63) (org.apache.flink.addons.hbase.HBaseTableSourceInputFormat)) -> FlatMap (select: (f1.q1 AS q1, f1.q2 AS q2, f1.q3 AS q3)) (10/32)" Getting this error in the travis build. But in my linux box it seems to pass. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 @wuchong I think I have updated the last comments from you. Thank you for all your help/support here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r98145293 --- Diff: flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java --- @@ -0,0 +1,196 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.addons.hbase.HBaseTableSchema; +import org.apache.flink.addons.hbase.HBaseTableSource; +import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter { + + private static final byte[] ROW_1 = Bytes.toBytes("row1"); + private static final byte[] ROW_2 = Bytes.toBytes("row2"); + private static final byte[] ROW_3 = Bytes.toBytes("row3"); + private static final byte[] F_1 = Bytes.toBytes("f1"); + private static final byte[] F_2 = Bytes.toBytes("f2"); + private static final byte[] Q_1 = Bytes.toBytes("q1"); + private static final byte[] Q_2 = Bytes.toBytes("q2"); + private static final byte[] Q_3 = Bytes.toBytes("q3"); + + @BeforeClass + public static void activateHBaseCluster(){ + registerHBaseMiniClusterInClasspath(); + } + + @Test + public void testHBaseTableSourceWithSingleColumnFamily() throws Exception { + // create a table with single region + TableName tableName = TableName.valueOf("test"); + // no split keys + byte[][] famNames = new byte[1][]; + famNames[0] = F_1; + createTable(tableName, famNames, null); + // get the htable instance + HTable table = openTable(tableName); + List puts = new ArrayList(); + // add some data + Put put = new Put(ROW_1); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(100)); + //2nd qual is String + put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue")); + // 3rd qual is long + put.addColumn(F_1, Q_3, Bytes.toBytes(19991L)); + puts.add(put); + + put = new Put(ROW_2); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(101)); + //2nd qual is String + put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue1")); + // 3rd qual is long + put.addColumn(F_1, Q_3, Bytes.toBytes(19992L)); + puts.add(put); + + put = new Put(ROW_3); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(102)); +
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 Fixed all the minor comments given above. @tonycox , @wuchong , @fhueske . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97934537 --- Diff: flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java --- @@ -0,0 +1,248 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.addons.hbase.HBaseTableSchema; +import org.apache.flink.addons.hbase.HBaseTableSource; +import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter { + + public static final byte[] ROW_1 = Bytes.toBytes("row1"); + public static final byte[] ROW_2 = Bytes.toBytes("row2"); + public static final byte[] ROW_3 = Bytes.toBytes("row3"); + public static final byte[] F_1 = Bytes.toBytes("f1"); + public static final byte[] F_2 = Bytes.toBytes("f2"); + public static final byte[] Q_1 = Bytes.toBytes("q1"); + public static final byte[] Q_2 = Bytes.toBytes("q2"); + public static final byte[] Q_3 = Bytes.toBytes("q3"); + + @BeforeClass + public static void activateHBaseCluster(){ + registerHBaseMiniClusterInClasspath(); + } + + @Test + public void testHBaseTableSourceWithSingleColumnFamily() throws Exception { + // create a table with single region + MapFunction<Row, String> mapFunction = new MapFunction<Row, String>() { + + @Override + public String map(Row value) throws Exception { + return value == null ? "null" : value.toString(); + } + }; + TableName tableName = TableName.valueOf("test"); + // no split keys + byte[][] famNames = new byte[1][]; + famNames[0] = F_1; + createTable(tableName, famNames, null); + // get the htable instance + HTable table = openTable(tableName); + List puts = new ArrayList(); + // add some data + Put put = new Put(ROW_1); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(100)); + //2nd qual is String + put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue")); + // 3rd qual is long + put.addColumn(F_1, Q_3, Bytes.toBytes(19991l)); + puts.add(put); + + put = new Put(ROW_2); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(101)); + //2nd qual is String + put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue1")); + // 3rd qual is long +
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97934510 --- Diff: flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java --- @@ -0,0 +1,248 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.addons.hbase.HBaseTableSchema; +import org.apache.flink.addons.hbase.HBaseTableSource; +import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter { + + public static final byte[] ROW_1 = Bytes.toBytes("row1"); + public static final byte[] ROW_2 = Bytes.toBytes("row2"); + public static final byte[] ROW_3 = Bytes.toBytes("row3"); + public static final byte[] F_1 = Bytes.toBytes("f1"); + public static final byte[] F_2 = Bytes.toBytes("f2"); + public static final byte[] Q_1 = Bytes.toBytes("q1"); + public static final byte[] Q_2 = Bytes.toBytes("q2"); + public static final byte[] Q_3 = Bytes.toBytes("q3"); + + @BeforeClass + public static void activateHBaseCluster(){ + registerHBaseMiniClusterInClasspath(); + } + + @Test + public void testHBaseTableSourceWithSingleColumnFamily() throws Exception { + // create a table with single region + MapFunction<Row, String> mapFunction = new MapFunction<Row, String>() { + + @Override + public String map(Row value) throws Exception { + return value == null ? "null" : value.toString(); + } + }; + TableName tableName = TableName.valueOf("test"); + // no split keys + byte[][] famNames = new byte[1][]; + famNames[0] = F_1; + createTable(tableName, famNames, null); + // get the htable instance + HTable table = openTable(tableName); + List puts = new ArrayList(); + // add some data + Put put = new Put(ROW_1); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(100)); + //2nd qual is String + put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue")); + // 3rd qual is long + put.addColumn(F_1, Q_3, Bytes.toBytes(19991l)); + puts.add(put); + + put = new Put(ROW_2); + // add 3 qualifiers per row + //1st qual is integer + put.addColumn(F_1, Q_1, Bytes.toBytes(101)); + //2nd qual is String + put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue1")); + // 3rd qual is long +
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97934502 --- Diff: flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java --- @@ -0,0 +1,248 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.addons.hbase.HBaseTableSchema; +import org.apache.flink.addons.hbase.HBaseTableSource; +import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter { + + public static final byte[] ROW_1 = Bytes.toBytes("row1"); + public static final byte[] ROW_2 = Bytes.toBytes("row2"); + public static final byte[] ROW_3 = Bytes.toBytes("row3"); + public static final byte[] F_1 = Bytes.toBytes("f1"); + public static final byte[] F_2 = Bytes.toBytes("f2"); + public static final byte[] Q_1 = Bytes.toBytes("q1"); + public static final byte[] Q_2 = Bytes.toBytes("q2"); + public static final byte[] Q_3 = Bytes.toBytes("q3"); --- End diff -- ya . My bad. Will remove. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97934495 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.conf.Configuration; + +/** + * Creates a table source that helps to scan data from an hbase table + * + * Note : the colNames are specified along with a familyName and they are seperated by a ':' + * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier name + */ +// TODO : Implement ProjectableTableSource? +public class HBaseTableSource implements BatchTableSource { --- End diff -- Am not sure.. For now I think we will implement BatchTableSource only and later implement StreamTableSource? Is there any significant design expectation for a source to be StreamTableSource? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97934428 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.Date; + +/** + * Helps to specify an HBase Table's schema + */ +public class HBaseTableSchema implements Serializable { + + // A Map with key as column family. + private final Map<String, List<Pair<String, TypeInformation>>> familyMap = + new HashMap<String, List<Pair<String, TypeInformation>>>(); + + // Allowed types. This may change. + // TODO : Check if the Date type should be the one in java.util or the one in java.sql --- End diff -- Ok. Makes sense. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97934446 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.Date; + +/** + * Helps to specify an HBase Table's schema + */ +public class HBaseTableSchema implements Serializable { + + // A Map with key as column family. + private final Map<String, List<Pair<String, TypeInformation>>> familyMap = + new HashMap<String, List<Pair<String, TypeInformation>>>(); + + // Allowed types. This may change. + // TODO : Check if the Date type should be the one in java.util or the one in java.sql + private static Class[] CLASS_TYPES = { + Integer.class, Short.class, Float.class, Long.class, String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, BigDecimal.class, Date.class, byte[].class + }; + /** +* Allows specifying the family and qualifier name along with the data type of the qualifier for an HBase table +* +* @param familythe family name +* @param qualifier the qualifier name +* @param clazz the data type of the qualifier +*/ + public void addColumn(String family, String qualifier, Class clazz) { + Preconditions.checkNotNull(family, "family name"); + Preconditions.checkNotNull(family, "qualifier name"); --- End diff -- Good catch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97934406 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.Date; + +/** + * Helps to specify an HBase Table's schema + */ +public class HBaseTableSchema implements Serializable { + + // A Map with key as column family. + private final Map<String, List<Pair<String, TypeInformation>>> familyMap = + new HashMap<String, List<Pair<String, TypeInformation>>>(); --- End diff -- Ok. In our other projects we used to qualify the generic on both the sides. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 @fhueske , @tonycox , @wuchong - FYI. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 Updated the code with the comments and have pushed again. I think I have addressed all the comments here. Feedback/comments welcome. I also found that it is better to use the TableInputSplit to specify the start and end row so that the scan is anyway restricted to the given range. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97738061 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java --- @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.Date; + +/** + * Helps to specify an HBase Table's schema + */ +public class HBaseTableSchema implements Serializable { + + // A Map with key as column family. + private final Map<String, List> familyMap = + new HashMap<String, List>(); + + // Allowed types. This may change. + // TODO : Check if the Date type should be the one in java.util or the one in java.sql + private static Class[] CLASS_TYPES = { + Integer.class, Short.class, Float.class, Long.class, String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, BigDecimal.class, Date.class + }; + private static byte[] EMPTY_BYTE_ARRAY = new byte[0]; + public void addColumns(String family, String qualifier, TypeInformation type) { + Preconditions.checkNotNull(family, "family name"); + Preconditions.checkNotNull(family, "qualifier name"); + Preconditions.checkNotNull(type, "type name"); + List list = this.familyMap.get(family); + if (list == null) { + list = new ArrayList(); + } + boolean found = false; + for(Class classType : CLASS_TYPES) { + if(classType == type.getTypeClass()) { + found = true; + break; + } + } + if(!found) { + // by default it will be byte[] type only + type = BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO; --- End diff -- Ok I will add byte[].class into the CLASS_TYPES. And anything other than the ones in CLASS_TYPES we can throw an exception. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97736554 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java --- @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.Date; + +/** + * Helps to specify an HBase Table's schema + */ +public class HBaseTableSchema implements Serializable { + + // A Map with key as column family. + private final Map<String, List> familyMap = + new HashMap<String, List>(); + + // Allowed types. This may change. + // TODO : Check if the Date type should be the one in java.util or the one in java.sql + private static Class[] CLASS_TYPES = { + Integer.class, Short.class, Float.class, Long.class, String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, BigDecimal.class, Date.class + }; + private static byte[] EMPTY_BYTE_ARRAY = new byte[0]; + public void addColumns(String family, String qualifier, TypeInformation type) { + Preconditions.checkNotNull(family, "family name"); + Preconditions.checkNotNull(family, "qualifier name"); + Preconditions.checkNotNull(type, "type name"); + List list = this.familyMap.get(family); + if (list == null) { + list = new ArrayList(); + } + boolean found = false; + for(Class classType : CLASS_TYPES) { + if(classType == type.getTypeClass()) { + found = true; + break; + } + } + if(!found) { + // by default it will be byte[] type only + type = BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO; + } + list.add(new Pair(qualifier, type)); + familyMap.put(family, list); + } + + public Map<String, List> getFamilyMap() { + return this.familyMap; + } + + public Object deserialize(byte[] value, TypeInformation typeInfo) { + if (typeInfo.isBasicType()) { + if (typeInfo.getTypeClass() == Integer.class) { + return Bytes.toInt(value); + } else if (typeInfo.getTypeClass() == Short.class) { + return Bytes.toShort(value); + } else if (typeInfo.getTypeClass() == Float.class) { + return Bytes.toFloat(value); + } else if (typeInfo.getTypeClass() == Long.class) { + return Bytes.toLong(value); + } else if (typeInfo.getTypeClass() == String.class) { + return Bytes.toString(value); + } else if (typeInfo.getTypeClass() == Byte.class) { + return value[0]; + } else if (typeInfo.getTypeClass() == Boolean.class) { + return Bytes.toBoolean(value); + } else if (typeInfo.getTypeClass() == Double.class) { + return Bytes.toDouble(value); + } else if (typeInfo.getTypeClass() == BigInteger.class) { +
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97709565 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * {@link InputFormat} subclass that wraps the access for HTables. Returns the result as {@link Row} + */ +public class HBaseTableSourceInputFormat extends TableInputFormat implements ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(HBaseTableSourceInputFormat.class); + private String tableName; + private transient Connection conn; + private transient org.apache.hadoop.conf.Configuration conf; + private HBaseTableSchema schema; + + public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration conf, String tableName, HBaseTableSchema schema) { + this.tableName = tableName; + this.conf = conf; + this.schema = schema; + } + + @Override + public void configure(Configuration parameters) { + LOG.info("Initializing HBaseConfiguration"); + connectToTable(); + if(table != null) { + scan = getScanner(); + } + } + + @Override + protected Scan getScanner() { + // TODO : Pass 'rowkey'. For this we need FilterableTableSource + Scan scan = new Scan(); + Map<String, List> familyMap = schema.getFamilyMap(); + for(String family : familyMap.keySet()) { + // select only the fields in the 'selectedFields' + List colDetails = familyMap.get(family); + for(Pair<String, TypeInformation> pair : colDetails) { + scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(pair.getFirst())); + } + } + return scan; + } + + @Override + public String getTableName() { + return tableName; + } + + @Override + protected Row mapResultToTuple(Result res) { + List values = new ArrayList(); + int i = 0; + Map<String, List> familyMap = schema.getFamilyMap(); + Row[] rows = new Row[familyMap.size()]; + for(String family : familyMap.keySet()) { + List colDetails = familyMap.get(family); + for(Pair<String, TypeInformation> pair : colDetails) { + byte[] value = res.getValue(Bytes.toBytes(family), Bytes.toBytes(pair.getFirst())); + if(value != null) { +
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97709590 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java --- @@ -19,99 +19,113 @@ package org.apache.flink.addons.hbase; import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.io.LocatableInputSplitAssigner; -import org.apache.flink.api.common.io.RichInputFormat; -import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.types.Row; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.HRegionLocator; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.math.BigDecimal; -import java.math.BigInteger; import java.util.ArrayList; -import java.util.Date; import java.util.List; +import java.util.Map; /** * {@link InputFormat} subclass that wraps the access for HTables. Returns the result as {@link Row} */ -public class HBaseTableSourceInputFormat extends RichInputFormat<Row, TableInputSplit> implements ResultTypeQueryable { +public class HBaseTableSourceInputFormat extends TableInputFormat implements ResultTypeQueryable { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(HBaseTableSourceInputFormat.class); private String tableName; - private TypeInformation[] fieldTypeInfos; - private String[] fieldNames; - private transient Table table; - private transient Scan scan; private transient Connection conn; - private ResultScanner resultScanner = null; - - private byte[] lastRow; - private int scannedRows; - private boolean endReached = false; - private org.apache.hadoop.conf.Configuration conf; - private static final String COLON = ":"; + private transient org.apache.hadoop.conf.Configuration conf; + private HBaseTableSchema schema; - public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) { - this.conf = conf; + public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration conf, String tableName, HBaseTableSchema schema) { this.tableName = tableName; - this.fieldNames = fieldNames; - this.fieldTypeInfos = fieldTypeInfos; + this.conf = conf; + this.schema = schema; } @Override public void configure(Configuration parameters) { LOG.info("Initializing HBaseConfiguration"); connectToTable(); if(table != null) { - scan = createScanner(); + scan = getScanner(); } } - private Scan createScanner() { + @Override + protected Scan getScanner() { + // TODO : Pass 'rowkey'. For this we need FilterableTableSource Scan scan = new Scan(); - for(String field : fieldNames) { + Map<String, List> familyMap = schema.getFamilyMap(); + for(String family : familyMap.keySet()) { // select only the fields in the 'selectedFields' - String[] famCol = field.split(COLON); - scan.addColumn(Bytes.toBytes(famCol[0]), Bytes.toBytes(famCol[1])); + List colDetails = familyMap.get(family); + for(Pair<String,
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97709469 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java --- @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.Date; + +/** + * Helps to specify an HBase Table's schema + */ +public class HBaseTableSchema implements Serializable { + + // A Map with key as column family. + private final Map<String, List> familyMap = + new HashMap<String, List>(); + + // Allowed types. This may change. + // TODO : Check if the Date type should be the one in java.util or the one in java.sql + private static Class[] CLASS_TYPES = { + Integer.class, Short.class, Float.class, Long.class, String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, BigDecimal.class, Date.class + }; + private static byte[] EMPTY_BYTE_ARRAY = new byte[0]; + public void addColumns(String family, String qualifier, TypeInformation type) { --- End diff -- Ok. Let me check that. So if we pass Class there we could wrap it with the corresponding TypeInformation for our internal usage? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97709482 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java --- @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.Date; + +/** + * Helps to specify an HBase Table's schema + */ +public class HBaseTableSchema implements Serializable { + + // A Map with key as column family. + private final Map<String, List> familyMap = + new HashMap<String, List>(); + + // Allowed types. This may change. + // TODO : Check if the Date type should be the one in java.util or the one in java.sql + private static Class[] CLASS_TYPES = { + Integer.class, Short.class, Float.class, Long.class, String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, BigDecimal.class, Date.class + }; + private static byte[] EMPTY_BYTE_ARRAY = new byte[0]; + public void addColumns(String family, String qualifier, TypeInformation type) { + Preconditions.checkNotNull(family, "family name"); + Preconditions.checkNotNull(family, "qualifier name"); + Preconditions.checkNotNull(type, "type name"); + List list = this.familyMap.get(family); + if (list == null) { + list = new ArrayList(); + } + boolean found = false; + for(Class classType : CLASS_TYPES) { + if(classType == type.getTypeClass()) { + found = true; + break; + } + } + if(!found) { + // by default it will be byte[] type only + type = BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO; --- End diff -- Ok. Got it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97709518 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java --- @@ -22,54 +22,63 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.table.sources.BatchTableSource; -import org.apache.flink.table.sources.ProjectableTableSource; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Pair; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** * Creates a table source that helps to scan data from an hbase table * * Note : the colNames are specified along with a familyName and they are seperated by a ':' * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier name */ -public class HBaseTableSource implements BatchTableSource, ProjectableTableSource { +// TODO : Implement ProjectableTableSource? +public class HBaseTableSource implements BatchTableSource { private Configuration conf; private String tableName; - private byte[] rowKey; - private String[] colNames; - private TypeInformation[] colTypes; + private HBaseTableSchema schema; + private String[] famNames; - public HBaseTableSource(Configuration conf, String tableName, byte[] rowKey, String[] colNames, - TypeInformation[] colTypes) { + public HBaseTableSource(Configuration conf, String tableName, HBaseTableSchema schema) { this.conf = conf; this.tableName = Preconditions.checkNotNull(tableName, "Table name"); - this.rowKey = Preconditions.checkNotNull(rowKey, "Rowkey"); - this.colNames = Preconditions.checkNotNull(colNames, "Field names"); - this.colTypes = Preconditions.checkNotNull(colTypes, "Field types"); + this.schema = Preconditions.checkNotNull(schema, "Schema"); + Map<String, List> familyMap = schema.getFamilyMap(); + famNames = familyMap.keySet().toArray(new String[familyMap.size()]); } @Override public TypeInformation getReturnType() { - return new RowTypeInfo(colTypes); - } - - @Override - public DataSet getDataSet(ExecutionEnvironment execEnv) { - return execEnv.createInput(new HBaseTableSourceInputFormat(conf, tableName, colNames, colTypes), getReturnType()); - } + // split the fieldNames + Map<String, List> famMap = schema.getFamilyMap(); - @Override - public ProjectableTableSource projectFields(int[] fields) { - String[] newColNames = new String[fields.length]; - TypeInformation[] newColTypes = new TypeInformation[fields.length]; + List qualNames = new ArrayList(); --- End diff -- I will look into this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97709503 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java --- @@ -22,54 +22,63 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.table.sources.BatchTableSource; -import org.apache.flink.table.sources.ProjectableTableSource; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Pair; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** * Creates a table source that helps to scan data from an hbase table * * Note : the colNames are specified along with a familyName and they are seperated by a ':' * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier name */ -public class HBaseTableSource implements BatchTableSource, ProjectableTableSource { +// TODO : Implement ProjectableTableSource? +public class HBaseTableSource implements BatchTableSource { private Configuration conf; private String tableName; - private byte[] rowKey; - private String[] colNames; - private TypeInformation[] colTypes; + private HBaseTableSchema schema; + private String[] famNames; - public HBaseTableSource(Configuration conf, String tableName, byte[] rowKey, String[] colNames, - TypeInformation[] colTypes) { + public HBaseTableSource(Configuration conf, String tableName, HBaseTableSchema schema) { this.conf = conf; this.tableName = Preconditions.checkNotNull(tableName, "Table name"); - this.rowKey = Preconditions.checkNotNull(rowKey, "Rowkey"); - this.colNames = Preconditions.checkNotNull(colNames, "Field names"); - this.colTypes = Preconditions.checkNotNull(colTypes, "Field types"); + this.schema = Preconditions.checkNotNull(schema, "Schema"); + Map<String, List> familyMap = schema.getFamilyMap(); + famNames = familyMap.keySet().toArray(new String[familyMap.size()]); --- End diff -- Fine. Will do it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97709488 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java --- @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.Date; + +/** + * Helps to specify an HBase Table's schema + */ +public class HBaseTableSchema implements Serializable { + + // A Map with key as column family. + private final Map<String, List> familyMap = + new HashMap<String, List>(); + + // Allowed types. This may change. + // TODO : Check if the Date type should be the one in java.util or the one in java.sql + private static Class[] CLASS_TYPES = { + Integer.class, Short.class, Float.class, Long.class, String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, BigDecimal.class, Date.class + }; + private static byte[] EMPTY_BYTE_ARRAY = new byte[0]; + public void addColumns(String family, String qualifier, TypeInformation type) { + Preconditions.checkNotNull(family, "family name"); + Preconditions.checkNotNull(family, "qualifier name"); + Preconditions.checkNotNull(type, "type name"); + List list = this.familyMap.get(family); + if (list == null) { + list = new ArrayList(); + } + boolean found = false; + for(Class classType : CLASS_TYPES) { + if(classType == type.getTypeClass()) { + found = true; + break; + } + } + if(!found) { + // by default it will be byte[] type only + type = BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO; + } + list.add(new Pair(qualifier, type)); --- End diff -- Ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 @fhueske , @tonycox , @wuchong I have updated the PR based on all the feedbacks here. Now you could see that we now support CompoisteRowType and we are able to specify multiple column families along with the qualifier names. We are able to retrieve the result by doing a full scan. This is not efficient and we need to specify start and end rows. I think that can be done after FilterableTableSource is done. I have added test cases that shows single column family and double column family. For now if the TypeInformation is not known we use plain byte[] type only. That happens at the validation state itself. But one main concern from my side is how to present the 'NULL' means we specify a column with a type but there is no data for that column. For now I have handled by returning the Int, Float, Long - Min_values. But that may not be right I believe. Feedback and suggestions welcome. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 Good news is that with the help of this Composite RowType and modifying my code accordingly and debugging things I could get the basic thing to work. Now I will work on stitching things together and submitting a PR with updated changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 Thanks for all the inputs here. I have been trying to make my existing code work with the composite RowTypeInfo. Once that is done I will try to introduce the HBaseTableSchema. Also I would like to work on FLINK-3849 (FilterableTableSource) after this first version of HBaseTableSource is accepted. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97304540 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.table.sources.ProjectableTableSource; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.conf.Configuration; + +/** + * Creates a table source that helps to scan data from an hbase table + * + * Note : the colNames are specified along with a familyName and they are seperated by a ':' + * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier name + */ +public class HBaseTableSource implements BatchTableSource, ProjectableTableSource { + + private Configuration conf; + private String tableName; + private byte[] rowKey; + private String[] colNames; + private TypeInformation[] colTypes; + + public HBaseTableSource(Configuration conf, String tableName, byte[] rowKey, String[] colNames, + TypeInformation[] colTypes) { + this.conf = conf; + this.tableName = Preconditions.checkNotNull(tableName, "Table name"); + this.rowKey = Preconditions.checkNotNull(rowKey, "Rowkey"); --- End diff -- I tried this customization but still if I pass something like f1.q1 it was throwing a validation error which was fine after I used it as suggested by @tonycox . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97055300 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.table.sources.ProjectableTableSource; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.conf.Configuration; + +/** + * Creates a table source that helps to scan data from an hbase table + * + * Note : the colNames are specified along with a familyName and they are seperated by a ':' + * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier name + */ +public class HBaseTableSource implements BatchTableSource, ProjectableTableSource { + + private Configuration conf; + private String tableName; + private byte[] rowKey; + private String[] colNames; + private TypeInformation[] colTypes; + + public HBaseTableSource(Configuration conf, String tableName, byte[] rowKey, String[] colNames, + TypeInformation[] colTypes) { + this.conf = conf; + this.tableName = Preconditions.checkNotNull(tableName, "Table name"); + this.rowKey = Preconditions.checkNotNull(rowKey, "Rowkey"); --- End diff -- `new RowTypeInfo( new TypeInformation[]{ new RowTypeInfo( new TypeInformation[]{ BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO}, new String[]{"name", "age"}) }, new String[]{"person"} );` I did the above change for the HBaseTableSource's return type and then made a simple query ` tableEnv.registerTableSource("test", hbaseTable); Table result = tableEnv .sql("SELECT f1.q1, f1.q2, f1.q3 FROM test");` It throws sql validation error and considers f1 as table name and says tableName 'f1' not found. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97015379 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.LocatableInputSplitAssigner; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.HRegionLocator; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +/** + * {@link InputFormat} subclass that wraps the access for HTables. Returns the result as {@link Row} + */ +public class HBaseTableSourceInputFormat extends RichInputFormat<Row, TableInputSplit> implements ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(HBaseTableSourceInputFormat.class); + private String tableName; + private TypeInformation[] fieldTypeInfos; + private String[] fieldNames; + private transient Table table; + private transient Scan scan; + private transient Connection conn; + private ResultScanner resultScanner = null; + + private byte[] lastRow; + private int scannedRows; + private boolean endReached = false; + private org.apache.hadoop.conf.Configuration conf; + private static final String COLON = ":"; + + public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) { + this.conf = conf; + this.tableName = tableName; + this.fieldNames = fieldNames; + this.fieldTypeInfos = fieldTypeInfos; + } + + @Override + public void configure(Configuration parameters) { + LOG.info("Initializing HBaseConfiguration"); + connectToTable(); + if(table != null) { + scan = createScanner(); + } + } + + private Scan createScanner() { + Scan scan = new Scan(); + for(String field : fieldNames) { + // select only the fields in the 'selectedFields' + String[] famCol = field.split(COLON); + scan.addColumn(Bytes.toBytes(famCol[0]), Bytes.toBytes(famCol[1])); + } + return scan; + } + + private void connectToTable() { + //use files found in the classpath + if(this.con
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97015321 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.table.sources.ProjectableTableSource; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.conf.Configuration; + +/** + * Creates a table source that helps to scan data from an hbase table + * + * Note : the colNames are specified along with a familyName and they are seperated by a ':' + * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier name + */ +public class HBaseTableSource implements BatchTableSource, ProjectableTableSource { + + private Configuration conf; + private String tableName; + private byte[] rowKey; + private String[] colNames; + private TypeInformation[] colTypes; + + public HBaseTableSource(Configuration conf, String tableName, byte[] rowKey, String[] colNames, + TypeInformation[] colTypes) { + this.conf = conf; + this.tableName = Preconditions.checkNotNull(tableName, "Table name"); + this.rowKey = Preconditions.checkNotNull(rowKey, "Rowkey"); --- End diff -- Yes. That is true but do we always want full table scan? Actually in HBase it is better we specify start and end key. So how do we specify that? I have not used this rowKey now but I thought it is better to be used? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97015267 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.LocatableInputSplitAssigner; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.HRegionLocator; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +/** + * {@link InputFormat} subclass that wraps the access for HTables. Returns the result as {@link Row} + */ +public class HBaseTableSourceInputFormat extends RichInputFormat<Row, TableInputSplit> implements ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(HBaseTableSourceInputFormat.class); + private String tableName; + private TypeInformation[] fieldTypeInfos; + private String[] fieldNames; + private transient Table table; + private transient Scan scan; + private transient Connection conn; + private ResultScanner resultScanner = null; + + private byte[] lastRow; + private int scannedRows; + private boolean endReached = false; + private org.apache.hadoop.conf.Configuration conf; + private static final String COLON = ":"; + + public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) { + this.conf = conf; + this.tableName = tableName; + this.fieldNames = fieldNames; + this.fieldTypeInfos = fieldTypeInfos; + } + + @Override + public void configure(Configuration parameters) { + LOG.info("Initializing HBaseConfiguration"); + connectToTable(); + if(table != null) { + scan = createScanner(); + } + } + + private Scan createScanner() { + Scan scan = new Scan(); + for(String field : fieldNames) { + // select only the fields in the 'selectedFields' + String[] famCol = field.split(COLON); + scan.addColumn(Bytes.toBytes(famCol[0]), Bytes.toBytes(famCol[1])); + } + return scan; + } + + private void connectToTable() { + //use files found in the classpath + if(this.con
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 Thanks for the ping here @tonycox . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r96790253 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.LocatableInputSplitAssigner; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.HRegionLocator; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +/** + * {@link InputFormat} subclass that wraps the access for HTables. Returns the result as {@link Row} + */ +public class HBaseTableSourceInputFormat extends RichInputFormat<Row, TableInputSplit> implements ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(HBaseTableSourceInputFormat.class); + private String tableName; + private TypeInformation[] fieldTypeInfos; + private String[] fieldNames; + private transient Table table; + private transient Scan scan; + private transient Connection conn; + private ResultScanner resultScanner = null; + + private byte[] lastRow; + private int scannedRows; + private boolean endReached = false; + private org.apache.hadoop.conf.Configuration conf; + private static final String COLON = ":"; + + public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) { + this.conf = conf; + this.tableName = tableName; + this.fieldNames = fieldNames; + this.fieldTypeInfos = fieldTypeInfos; + } + + @Override + public void configure(Configuration parameters) { + LOG.info("Initializing HBaseConfiguration"); + connectToTable(); + if(table != null) { + scan = createScanner(); + } + } + + private Scan createScanner() { + Scan scan = new Scan(); + for(String field : fieldNames) { + // select only the fields in the 'selectedFields' + String[] famCol = field.split(COLON); + scan.addColumn(Bytes.toBytes(famCol[0]), Bytes.toBytes(famCol[1])); + } + return scan; + } + + private void connectToTable() { + //use files found in the classpath + if(this.con
[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r96790136 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.LocatableInputSplitAssigner; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.HRegionLocator; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +/** + * {@link InputFormat} subclass that wraps the access for HTables. Returns the result as {@link Row} + */ +public class HBaseTableSourceInputFormat extends RichInputFormat<Row, TableInputSplit> implements ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(HBaseTableSourceInputFormat.class); + private String tableName; + private TypeInformation[] fieldTypeInfos; + private String[] fieldNames; + private transient Table table; + private transient Scan scan; + private transient Connection conn; + private ResultScanner resultScanner = null; + + private byte[] lastRow; + private int scannedRows; + private boolean endReached = false; + private org.apache.hadoop.conf.Configuration conf; + private static final String COLON = ":"; + + public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) { + this.conf = conf; + this.tableName = tableName; + this.fieldNames = fieldNames; + this.fieldTypeInfos = fieldTypeInfos; + } + + @Override + public void configure(Configuration parameters) { + LOG.info("Initializing HBaseConfiguration"); + connectToTable(); + if(table != null) { + scan = createScanner(); + } + } + + private Scan createScanner() { + Scan scan = new Scan(); + for(String field : fieldNames) { + // select only the fields in the 'selectedFields' + String[] famCol = field.split(COLON); + scan.addColumn(Bytes.toBytes(famCol[0]), Bytes.toBytes(famCol[1])); + } + return scan; + } + + private void connectToTable() { + //use files found in the classpath + if(this.con