[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user nielsbasjes commented on the issue: https://github.com/apache/flink/pull/2332 FYI: In close relation to this issue I submitted an enhancement at the HBase side to support these kinds of usecases much better: https://issues.apache.org/jira/browse/HBASE-19486 https://issues.apache.org/jira/browse/HBASE-19673 Simply put: You can now specify a max time the records may be kept in the BufferedMutator. If they exceed this value then a flush will be done automatically. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user nragon commented on the issue: https://github.com/apache/flink/pull/2332 Indeed, I'm also not aware of how users use it. I would take a similiar approach than c*, already in made for streaming, seems to use a existing async api from datastax. But again, I couldn't find accurate feedback regarding backpressure. If this is an issue, then working with Flink states is a must, imo. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2332 So I think at a first level let us have put/delete mutations alone for Streaming ? Since am not aware of how flink users are currently interacting with HBase not sure on what HBase ops should be supported. From your case it seems puts/deletes are the common cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user nragon commented on the issue: https://github.com/apache/flink/pull/2332 No, only puts. Aggregation is coming from a reduce which by itself aggregates, keeping recent values on hbase, more like snapshots I think. During our analysis sending data to hbase was only reliable when working with aggregates otherwise a huge amount of backpressure will come from region servers. Full dumps are more likely to work on elasticsearch or hdfs than hbase. So, because Flink does an amazing job at keeping states why bother to overload region servers with so many requests? Again, our point of view, of course. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2332 @nragon I agree. But your use case does it have increments/appends? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user nragon commented on the issue: https://github.com/apache/flink/pull/2332 From my point of view, my sample works fine (my use case) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2332 I can take this up and come up with a design doc. Reading thro the comments here and the final decision points I think only puts/deletes can be considered idempotent. But increments/decrements cannot be considered to be idempotent. We may need to two types of sink then one sink which supports the puts/deletes and the other one where we need to support non-idempotent ops. Coming to @nielsbasjes issue of not able to flush the buffered mutator - should you always call bufferedmutator#flush() on every checkpoint call? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user nragon commented on the issue: https://github.com/apache/flink/pull/2332 I've made a custom solution which works for my use cases. Notice that the code attached is not working because it's only a skeleton. This prototype uses asynchbase and tries to manage throttling issues as mentioned above. The way I do this is by limiting requests per client by 1000 (also configurable, if you want, depending on hbase capacity and response), and skipping records after reaching that threshold. Every record skipped is updated according with system timestamp, always keeping the most recent skipped record for later updates. Now, in my use case I always use a keyby -> reduce before sink, which keeps the aggregation state, meaning that every record invoked by hbase sink will have the last aggregated value from your previous operators. When all requests are done `pending == 0` I compare the last skipped record with the last requested record, if the skipped timestamp is less than the requested timestamp means that hbase has the last aggregation. There is plenty of room for improvments, i just did'nt have the time. [HBaseSink.txt](https://github.com/apache/flink/files/1014991/HBaseSink.txt) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user fpompermaier commented on the issue: https://github.com/apache/flink/pull/2332 anyone working on this? HBase streaming sink would be a very nice addition... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user nragon commented on the issue: https://github.com/apache/flink/pull/2332 Currently trying to fix some throttling issues, reported here https://github.com/OpenTSDB/asynchbase/issues/162 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user delding commented on the issue: https://github.com/apache/flink/pull/2332 Hi @ramkrish86 , sorry I don't have bandwidth working on it.. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2332 @delding Are you still working on this? @nragon Let me know how can be of help here? I can work on this PR too since I have some context on the existing PR though it is stale. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user nragon commented on the issue: https://github.com/apache/flink/pull/2332 Hi, I'm testing one my self with a third party library, http://opentsdb.github.io/asynchbase. I'm following a simliar approach as cassandra sink. Testing it as we speak. Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2332 Hi @nragon, the PR seems to be stale. Are you interested in contributing a streaming HBase sink? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user nragon commented on the issue: https://github.com/apache/flink/pull/2332 Any updates on this sink? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user nragon commented on the issue: https://github.com/apache/flink/pull/2332 Hi :) I needed to use hbase as sink so i decided to take a look at this pull a use it. Current changes that might be interesting: Like hbase connector for dataset, it's possible to define the configuration in other to connect, for instance, to a remote hbase master. In [HBaseSink](https://github.com/delding/flink/blob/16ad4b2ac13567ceba7bacf15e4698fb4ce17c53/flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseSink.java) the `HBaseConfiguration.create()` method should receive this configurations set by constructor (or other method) Hope this makes sense. Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2332 Thanks for these discussions @delding and @ramkrish86. I think this is getting a bit hard to follow. What do you think about writing a design doc that describes the supported HBase features and consistency guarantees of the sink? The design doc does not need to be a full fetched [FLIP](https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals), just a document to share and discuss. I think this would make it easier for others to follow and join the discussion. I also find that explaining designs helps to improve them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2332 Ok. Then it should be clearly documented now. that the sink supports only Puts/Deletes. So in future can the sink be updated with new APIs? I don't know the procedure here in case new APIs have to be added in the sink. Since the sink is not part of the FLINK distro it can be enhanced or modified. Just wanted to know. May be as a first step make things simple ->Support only Puts/ Delets -> You want order to be guaranteed then go with RowMutations. -> clearly document what the sink does now. -> Pls verify if it is ok to update the sink to support other APIs in future. Anyway this will give you atleast-once guarentee. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user delding commented on the issue: https://github.com/apache/flink/pull/2332 I agree it might be an overkill. But in case of having an sink that only supports Put/Delete, it would be better to have ordered execution than not, after all HBase has this API so there could be some use case that needs order guarantee. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2332 With in a single row you need guarantee of order of execution? I agree Append/Increment or non-idempotent in certain failure cases but there is a Nonce generator for that. I should say I have not used that anyway to know the exact pros and cons of that. RowMutations atleast it supports only Put/Delete so in that angle you can be sure that for now we don't support Append/Increments. > we might need WriteAheadSink and figure out a way to roll back table to the last checkpoint state This will be tricky. Your hbase table rollback would mean that you may have to issue Deletes here so that the previous mutations are hidden. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user delding commented on the issue: https://github.com/apache/flink/pull/2332 Hi @ramkrish86 , I'm thinking replace batch() with mutateRow() because it provides atomic ordered mutations for a single row, but it only supports Put and Delete which should be fine since only Put and Delete are idempotent, this way we can implement Put and Delete without using WriteAheadSink (in case of deterministic processing). What do you think? Regarding Append and Delete, as HBase doesn't support distributed transaction across multiple rows, we might need WriteAheadSink and figure out a way to roll back table to the last checkpoint state. I'm thinking about this right now. So it might make sense to have two HBaseSinks, one for Put/Delet, the other for Append/Delete and non-deterministic processing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2332 @delding Do you have uses cases with Append/Increment? I think with the batch() API we are not sure of the order of execution of the batch() in the hbase server but still it would help us to achieve the atleast-once guarantee as @zentol mentioned here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2332 @nielsbasjes I want to avoid to exclude a version that is still in use by existing Flink users. I do not have in insight in which HBase versions currently in use. If (basically) everybody is on 1.1.x I am definitely in favor of bumping the version and API for Flink 1.2.0. Let's move the discussion to the dev and user mailing lists. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user nielsbasjes commented on the issue: https://github.com/apache/flink/pull/2332 @fhueske: What is "older" ? I would like a clear statement about the (minimal) supported versions of HBase. I would see 1.1.x as old enough, or do you see 0.98 still required? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2332 @nielsbasjes We would break compatibility with older HBase versions. Before we do that, we should start a discussion on the user and dev mailing lists. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user nielsbasjes commented on the issue: https://github.com/apache/flink/pull/2332 @fhueske Should the TableInputFormat be updated to use the HBase 1.1.2 api also? It would make things a bit cleaner. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2332 @ramkrish86 that is correct. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2332 > This scheme does not provide exactly-once delivery guarantees, however at any given point in time the table would be in a state as if the updates were only sent once. This is the same guarantee that we provide for Cassandra. @zentol - so for Cassandra also it is atleast-once guarentee is what we provide? So even if the mutations happens once again since the operations are idempotent we don't have any issues with such duplicates? Am I getting it right here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user nielsbasjes commented on the issue: https://github.com/apache/flink/pull/2332 Yes, as long as everything is compatible with HBase 1.1.2 its fine for me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user delding commented on the issue: https://github.com/apache/flink/pull/2332 Thanks for very detailed and helpful comments, let me work on it :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2332 Don't you loose any guarantees regarding order of mutations the moment you use asynchronous updates anyway? The WriteAheadSink should only be used if you want to deal with non-deterministic programs or want to send data in atomic mini-batches and must rely on the order of elements. Otherwise there are much simpler solutions. If you do idempotent updates the only thing you have to do is write the data into HBase, and make sure that every update sent for a given checkpoint is acknowledged before it is regarded as complete. If you don't acknowledge them you lose at-least-once guarantees. This scheme does not provide exactly-once *delivery* guarantees, however at any given point in time the table would be in a state as if the updates were only sent once. This is the same guarantee that we provide for Cassandra. For non-idempotent updates the thing gets a lot more difficult. If you can fire an entire checkpoint as a single atomic batch you just won the lottery, as you can use the above scheme and a small auxiliary table to track completed checkpoints per sink subtask. if you can't do that you will have to use system-specific features/guarantees to engineer a solution that provides exactly-once guarantees. Versioning, rollbacks, unique ID's; something that either allows you to revert the table to a clean state or track precisely which updates were applied and sent the remaining updates. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2332 Thanks for looking into the Cassandra connector. Maybe @zentol (who implemented the Cassandra connector) can comment how an HBase sink can be made fault-tolerant and what code could be reused for that. Idempotent (put, delete) and non-idempotent modifications (increment, append) have different implications on fault-tolerance semantics. I think it would be a good idea to separate both types of modifications to make this difference more clear. I would be OK updating the version update to HBase 1.1.6 or 1.2.x under the condition that these versions are backwards compatible to 1.1.2. @nielsbasjes, would that be OK for you as well? We have an ongoing discussion to merge batch and streaming connectors. So this code would end up in the same module as the batch `TableInputFormat` and we should keep the HBase versions synced. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user delding commented on the issue: https://github.com/apache/flink/pull/2332 Hi @ramkrish86 , CassandraTupleWriteAheadSink extends GenericWriteAheadSink that deals with the checkpoint mechanism --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2332 I agree with @delding here. >* Method that does a batch call on Deletes, Gets, Puts, Increments and Appends. * The ordering of execution of the actions is not defined. Meaning if you do a Put and a * Get in the same {@link #batch} call, you will not necessarily be * guaranteed that the Get returns what the Put had put. is the javadoc for batch API. Anyway here we don't do get operation but still the order of execution among mutations is not guarenteed. Regarding > storing input records in a state backend and flushing to HBase upon receiving a checkpoint barrier. Where is this being done in Flink. Just for understanding. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2332 ` #2330 updates the version of the batch TableInputFormat to HBase 1.1.2. I think we should use the same version here.` Valid point. But is it possible to upgrade to even newer version like 1.1.6 or the branch1.2 series? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user delding commented on the issue: https://github.com/apache/flink/pull/2332 I can change back to HBase 1.1.2. The reason of using 2.0.0-SNAPSHOT is because this bug (https://issues.apache.org/jira/browse/HBASE-14963 ). It's only fixed for version 1.3.0+ and 2.0.0 and my example has problem of running because of it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user delding commented on the issue: https://github.com/apache/flink/pull/2332 Hi @fhueske , in HBase writes to a single row have ACID guarantee. The exactly once semantic can be implemented the way CassandraSink did, storing input records in a state backend and flushing to HBase upon receiving a checkpoint barrier. One thing that might be a concern is the order of execution of these writes are not defined when making such a batch call. In other words, a write of an earlier record could be observed later, but this could be also true even sending each write immediately. So what do you think if implement fault tolerance the same way as Cassandra Sink? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2332 I also noticed that the PR pulls in an HBase SNAPSHOT dependency. This is not possible. We cannot compile against a development version. #2330 updates the version of the batch TableInputFormat to HBase 1.1.2. I think we should use the same version here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2332 Hi @delding, I'm sorry that I did not mention this earlier, but I just noticed that the `HBaseSink` does not implement any logic for checkpointing and fault-tolerance. The checkpointing logic of a sink and the guarantees of the external data store define what kind of emission guarantees a sink function can offer (exactly once, at-least once). In order to make this PR mergable, we need to figure out what kind of guarantees it provides under which assumptions / in which scenarios. As an example see the [documentation of the Cassandra sink](https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/cassandra.html). This is very important because users will rely on the stated guarantees. I am not very familiar with HBase and what kind of write semantics and transnational guarantees it provides. Can you describe which kind of guarantees an `HBaseSink` can achieve and how it would be integrated with Flink's Checkpointing mechanism? Thanks, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user delding commented on the issue: https://github.com/apache/flink/pull/2332 Hi @fhueske , I have updated this PR to address your comments. In this change, only one MutationActions and two ArralyList are needed for entire stream, MutationActions is now resettable. But I replaced MutationActions actions(IN value) with void actions(IN value, MutationActions actions) instead of void actions(IN value, List actions) as you suggested. Because in this case, user can still utilize MutationActions's API to handle Mutations creation logic which makes user easier to code. I also added a connect() method as @ramkrish86 suggested, but didn't explicitly check existence of table in the code, because once Admin#tableExists is added a NoSuchColumnFamilyException will be thrown during running the example. I have no idea why this happens... For now, if table doesn't exist, a table not existence Exception will be thrown until HBaseClient#send() is called. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user delding commented on the issue: https://github.com/apache/flink/pull/2332 Hi @ramkrish86 , I squashed my recent commit into the one you reviewed before, then merged the upstream and pushed --force the commits which is why you only see two commits. The first commit actually includes those changes based on your comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2332 @delding Am not able to see the recent changes at all. Under 'commit' tab it shows 2 commits. If I open the merge commit I am not able to see the changes that you are saying here. Can you point me to the right one? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user delding commented on the issue: https://github.com/apache/flink/pull/2332 Hi @ramkrish86 , I squashed the commits before pushing.. Sorry about making it unable to see those recent changes. If you could still review this PR when you have time? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2332 How should I view your latest commit ? Am not able to see the changes that you have mentioned above. Thanks @delding --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user delding commented on the issue: https://github.com/apache/flink/pull/2332 Hi, I have updated the PR based on @ramkrish86 's comments. 1. rename MutationActionList to MutationActions which has a public method createMutations that return HBase Mutations 2. make MutationAction a private inner class 3. DeleteColumn action can now delete the latest version of the specified column 4. check if rowkey is null when create Mutations 5. hand over the combinations of Delete logic to HBase server And I checked if table doesn't exist in HBase in which case a table not found exception will be thrown --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2332 Cool. Let me know if you have any questions or if you need any help. I'll keep an eye on this PR and the Bahir-flink repository. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user delding commented on the issue: https://github.com/apache/flink/pull/2332 Hi @rmetzger , Thanks for your comment. I am totally ok with contributing the connector to Apache Bahir :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2332 @delding: Thanks a lot for opening a pull request for a streaming hbase connector to Flink. Would you be okay with contributing the connector to Apache Bahir, instead of Apache Flink? The repository is located here: https://github.com/apache/bahir-flink The Flink community recently decided to move some of our connectors to Apache Bahir [1][2]. If you want, I can also help migrating the pull request over to Bahir. At Bahir, I'll help with the review and make sure it gets merged. Also, once the first Flink connectors are in Bahir, I'm proposing (to the Bahir community) to create a first Bahir-flink release to get the code out. My goal is to have the code out well ahead of the next Flink release. [1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Move-Redis-and-Flume-connectors-to-Apache-Bahir-and-redirect-contributions-there-td13102.html [2] http://mail-archives.apache.org/mod_mbox/flink-dev/201608.mbox/%3CCAGr9p8CAN8KQTM6%2B3%2B%3DNv8M3ggYEE9gSqdKaKLQiWsWsKzZ21Q%40mail.gmail.com%3E --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user delding commented on the issue: https://github.com/apache/flink/pull/2332 Hi @ramkrish86 , thanks for your reviews and detailed comments :-) I will update this PR soon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2332 Thanks for the update @delding . I can check this PR more closely tomorrow my time. Had a glance at it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user delding commented on the issue: https://github.com/apache/flink/pull/2332 BTW, the example runs well on my machine when I set hbase-client version to 2.0.0-SNAPSHOT, but travis-ci build failed due to "Failed to collect dependencies at org.apache.habse:hbase-client:jar:2.0.0-SNAPSHOT" error. So I changed hbase dependency version back to 1.2.2 which unfortunately has a bug ( https://issues.apache.org/jira/browse/HBASE-14963 ) that makes the example fail to run. The bug is fixed for version 1.3.0+ and 2.0.0 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user delding commented on the issue: https://github.com/apache/flink/pull/2332 Hi, I have updated this PR, adding javadoc and an example as @tzulitai advised and created Enums for PUT, DELETE, INCREMENT and APPEND as @ramkrish86 advised. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user delding commented on the issue: https://github.com/apache/flink/pull/2332 Hi @tzulitai Thank you for all the comments :-) I will update the PR and add an example in next a few days --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2332 @delding Thank you for working on this contribution. Since I'm not that familiar with the HBase client API, I've only skimmed through the code for now. Will try to review in detail later. Btw, since there are no tests for this PR yet, can you describe how you tested it? An example of using the `HBaseSink` will also be helpful (we'd also need to add documentation for the sink, like the other connectors). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user delding commented on the issue: https://github.com/apache/flink/pull/2332 BTW, failing tests are results of this PR --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user delding commented on the issue: https://github.com/apache/flink/pull/2332 Hi, this is my second PR to Flink, any comments or suggestions will be very appreciated :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---