[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r79422057
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/test/java/org/apache/flink/streaming/connectors/hbase/example/HBaseSinkExample.java
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase.example;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.hbase.HBaseMapper;
+import org.apache.flink.streaming.connectors.hbase.HBaseSink;
+import org.apache.flink.streaming.connectors.hbase.MutationActions;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * This is an example showing how to use the HBaseSink in the Streaming 
API.
+ *
+ * To run the example you need a local HBase database that has a table 
"flink-example" with a column family "cf".
+ * In the example, the HBase sink takes an input of type {@link Tuple3} 
and perform different operations based on the input.
+ * The first field of a input value is used as the row key, the second 
field is treated as an opcode that
+ * determines which type of HBase operation is performed and the third 
field is the value to written.
+ */
+public class HBaseSinkExample {
+
+   private static final List> dataSource 
= new ArrayList<>(100);
+   private static final String TABLE_NAME = "flink-example";
+   private static final String FAMILY = "cf";
+   private static final String COLUMN1 = "c1";
+   private static final String COLUMN2 = "c2";
+   private static final String ROWKEY_PREFIX = "row-";
+
+   static {
+   Random random = new Random();
+   for (int i = 0; i < 99; i++) {
+   String rowKey = ROWKEY_PREFIX + (i % 20);
+   int opcode = random.nextInt(9);
+   int value = i;
+   dataSource.add(new Tuple3<>(rowKey, opcode, value));
+   }
+   }
+
+   public static void main(String[] args) throws Exception {
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream> source = 
env.fromCollection(dataSource);
+
+   source.addSink(new HBaseSink<>(TABLE_NAME, new 
HBaseMapperExample(FAMILY, COLUMN1, COLUMN2)));
+   env.execute();
+   }
+
+   /**
+* This class implements {@link HBaseMapper}.
+*/
+   private static class HBaseMapperExample implements 
HBaseMapper> {
+
+   private static final long serialVersionUID = 1L;
+
+   private byte[] family;
+   private byte[] col1;
+   private byte[] col2;
+
+   public HBaseMapperExample(String family, String col1, String 
col2) {
+   this.family = Bytes.toBytes(family);
+   this.col1 = Bytes.toBytes(col1);
+   this.col2 = Bytes.toBytes(col2);
+   }
+
+   @Override
+   public byte[] rowKey(Tuple3 value) {
+   return Bytes.toBytes(value.f0);
+   }
+
+   @Override
+   public MutationActions actions(Tuple3 
value) {
--- End diff --

I think this example is a bit too complicated. Can you change it to two or 
three different actions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not 

[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r79420572
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Function, Serializable {
+
+   /**
+* Given an input value return the HBase row key. Row key cannot be 
null.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
+
+   /**
+* Given an input value return a list of actions of to be performed on 
a single row in HBase table.
+*
+* @param value
+* @return a list of mutation actions
+*/
+   MutationActions actions(IN value);
--- End diff --

if we change the signature to `void actions(IN value, List 
actions)`, we can reuse the `List` object and do not need the `MutationActions` 
class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r79419954
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/MutationActions.java
 ---
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *  This class represents a list of {@link MutationAction}s you will take 
when writing
+ *  an input value of {@link HBaseSink} to a row in a HBase table.
+ *  Each {@link MutationAction} can create an HBase {@link Mutation} 
operation type
+ *  including {@link Put}, {@link Increment}, {@link Append} and {@link 
Delete}.
+ */
+public class MutationActions {
--- End diff --

Maybe I am missing something, but wouldn't it be easier to have all `addX` 
methods implemented in the HBaseMapper and let them directly return a 
`Mutation`? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-12 Thread delding
Github user delding commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r78478766
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Function, Serializable {
+
+   /**
+* Given an input value return the HBase row key. Row key cannot be 
null.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
--- End diff --

I am planing update this PR based on your comment and then ping @rmetzger 
for further review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-12 Thread delding
Github user delding commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r78478455
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Function, Serializable {
+
+   /**
+* Given an input value return the HBase row key. Row key cannot be 
null.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
--- End diff --

Hi @ramkrish86 , I was trying to and connect() method as you suggested and 
to explicitly test if table exists as following:
public void connect(String tableName) throws IOException {
connection = ConnectionFactory.createConnection(hbConfig);
TableName name = TableName.valueOf(tableName);
try (Admin admin = connection.getAdmin()) {
if (!admin.tableExists(name)) {
throw new RuntimeException("Table " + tableName 
+ " doesn't exist!");
}
}
table = connection.getTable(name);
}
But once the method admin.tableExists(name) is added, running my example 
will throw exception: 
org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException: Column family 
table does not exist in region hbase:meta,,1.1588230740 in table 'hbase:meta'
I've never run into this exception in my project that uses hbase client. Do 
you know the reason for this exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-12 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r78422242
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Function, Serializable {
+
+   /**
+* Given an input value return the HBase row key. Row key cannot be 
null.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
--- End diff --

> If so, getRowKey() API would be the same as rowkey(IN value) except a 
name change.

May be yes.  that is better? Ok. Let's see what others has to say. Can you 
ping @rmetzger for getting this in and for further reviews?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-12 Thread delding
Github user delding commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r78418366
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Function, Serializable {
+
+   /**
+* Given an input value return the HBase row key. Row key cannot be 
null.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
--- End diff --

Hi @ramkrish86 , if we have 
MutationActions mutationActions  = HBaseMapper#actions(rowkey, value);
Then, the rowkey information would be included in mutationActions object 
being created. So when we call mutationActions. createMutations() there would 
be no need to pass rowkey as an argument, which I think is totally fine. But 
what I'm confused is how do we derive rowkey from the input value? Should we 
pass input value as an argument to getRowKey() API you mentioned? If so, 
getRowKey() API would be the same as rowkey(IN value) except a name change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-09 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r78136477
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Function, Serializable {
+
+   /**
+* Given an input value return the HBase row key. Row key cannot be 
null.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
--- End diff --

> In other words, the row key returned from rowKey(IN value) will be passed 
to createMutations(byte[] rowKey, boolean writeToWAL) method called on the 
instance returned from actions(IN value). How do you think?

Am not entirely convinced on having a rowKey API and use that in 
createMutations. Since HBase is going to allow row key as byte[] I think there 
is no harm in exposing an argument with byte[] which represents the rowkey. 
Hence i was of that opinion. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-06 Thread delding
Github user delding commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r77676668
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Function, Serializable {
+
+   /**
+* Given an input value return the HBase row key. Row key cannot be 
null.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
--- End diff --

I did forget to add a connect method. Will add one shortly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-06 Thread delding
Github user delding commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r77676350
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Function, Serializable {
+
+   /**
+* Given an input value return the HBase row key. Row key cannot be 
null.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
--- End diff --

The assumption is that for each input value there will be only one 
corresponding row to be modified. Users define how to extract a row key and 
actions to be performed on this row key from a single input value. In other 
words, the row key returned from rowKey(IN value) will be passed to 
createMutations(byte[] rowKey, boolean writeToWAL) method called on the 
instance returned from actions(IN value).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-05 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r77547915
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Function, Serializable {
+
+   /**
+* Given an input value return the HBase row key. Row key cannot be 
null.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
--- End diff --

Rest looks good to me. I think naming of functions am not sure if it can be 
fine tuned. Am not very good at naming. And also FLINK team may have some 
naming conventions. I think having a connect() method and the above comment are 
the main things from my side. If it is fine, then it is upto Flink team to 
review this. Thanks @delding .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-05 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r77547840
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Function, Serializable {
+
+   /**
+* Given an input value return the HBase row key. Row key cannot be 
null.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
--- End diff --

Fine with all. But how do you related this rowKey and the one to be passed 
to createMutations?  May be will it be better to have actions(byte[] rowkey, IN 
value)? and add a getRowKey() API that returns byte[]? This can be used for 
createMutations?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-05 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r77547550
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseClient.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A client class that serves to create connection and send data to HBase.
+ */
+class HBaseClient implements Closeable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseClient.class);
+
+   private Connection connection;
+   private Table table;
+
+   public HBaseClient(org.apache.hadoop.conf.Configuration hbConfig, 
String tableName) throws IOException {
+   connection = ConnectionFactory.createConnection(hbConfig);
+   table = connection.getTable(TableName.valueOf(tableName));
--- End diff --

Not able to see the connect method. May be it is not added?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-16 Thread delding
Github user delding commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r74886606
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/MutationActionList.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *  This class represents a list of {@link MutationAction}s you will take 
when writing
+ *  an input value of {@link HBaseSink} to a row in a HBase table.
+ *  Each {@link MutationAction} can create an HBase {@link Mutation} 
operation type
+ *  including {@link Put}, {@link Increment}, {@link Append} and {@link 
Delete}.
+ */
+public class MutationActionList {
+   private final List actions;
+
+   public MutationActionList() {
+   this.actions = new ArrayList<>();
+   }
+
+   public List getActions() {
+   return this.actions;
+   }
+
+   /**
+* Create a new list of HBase {@link Mutation}s.
+*
+* @param rowKey row that the created {@link Mutation} list is applied 
to
+* @param writeToWAL enable WAL
+* @return a list of HBase {@link Mutation}s
+*/
+   public List newMutationList(byte[] rowKey, boolean 
writeToWAL) {
+   List mutations = new ArrayList<>();
+   Put put = null;
+   Increment increment = null;
+   Append append = null;
+   Delete delete = null;
+   boolean rowIsDeleted = false;
+   for (MutationAction action : actions) {
+   switch (action.getType()) {
+   case PUT:
+   if (put == null) {
+   put = new Put(rowKey);
+   mutations.add(put);
+   }
+   if (action.getTs() == -1) {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getValue());
+   } else {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getTs(), 
action.getValue());
+   }
+   break;
+
+   case INCREMENT:
+   if (increment == null) {
+   increment = new 
Increment(rowKey);
+   mutations.add(increment);
+   }
+   increment.addColumn(action.getFamily(), 
action.getQualifier(), action.getIncrement());
+   break;
+
+   case APPEND:
+   if (append == null) {
+   append = new Append(rowKey);
+   mutations.add(append);
+   }
+   append.add(action.getFamily(), 
action.getQualifier(), action.getValue());
+   break;
+
+   // If there are multiple DELETE_ROW actions, 
only the first one is served
+   case DELETE_ROW:
+   if (!rowIsDeleted) {
+

[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-15 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r74878005
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/MutationActionList.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *  This class represents a list of {@link MutationAction}s you will take 
when writing
+ *  an input value of {@link HBaseSink} to a row in a HBase table.
+ *  Each {@link MutationAction} can create an HBase {@link Mutation} 
operation type
+ *  including {@link Put}, {@link Increment}, {@link Append} and {@link 
Delete}.
+ */
+public class MutationActionList {
+   private final List actions;
+
+   public MutationActionList() {
+   this.actions = new ArrayList<>();
+   }
+
+   public List getActions() {
+   return this.actions;
+   }
+
+   /**
+* Create a new list of HBase {@link Mutation}s.
+*
+* @param rowKey row that the created {@link Mutation} list is applied 
to
+* @param writeToWAL enable WAL
+* @return a list of HBase {@link Mutation}s
+*/
+   public List newMutationList(byte[] rowKey, boolean 
writeToWAL) {
+   List mutations = new ArrayList<>();
+   Put put = null;
+   Increment increment = null;
+   Append append = null;
+   Delete delete = null;
+   boolean rowIsDeleted = false;
+   for (MutationAction action : actions) {
+   switch (action.getType()) {
+   case PUT:
+   if (put == null) {
+   put = new Put(rowKey);
+   mutations.add(put);
+   }
+   if (action.getTs() == -1) {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getValue());
+   } else {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getTs(), 
action.getValue());
+   }
+   break;
+
+   case INCREMENT:
+   if (increment == null) {
+   increment = new 
Increment(rowKey);
+   mutations.add(increment);
+   }
+   increment.addColumn(action.getFamily(), 
action.getQualifier(), action.getIncrement());
+   break;
+
+   case APPEND:
+   if (append == null) {
+   append = new Append(rowKey);
+   mutations.add(append);
+   }
+   append.add(action.getFamily(), 
action.getQualifier(), action.getValue());
+   break;
+
+   // If there are multiple DELETE_ROW actions, 
only the first one is served
+   case DELETE_ROW:
+   if (!rowIsDeleted) {
+ 

[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-15 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r74877942
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/MutationActionList.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *  This class represents a list of {@link MutationAction}s you will take 
when writing
+ *  an input value of {@link HBaseSink} to a row in a HBase table.
+ *  Each {@link MutationAction} can create an HBase {@link Mutation} 
operation type
+ *  including {@link Put}, {@link Increment}, {@link Append} and {@link 
Delete}.
+ */
+public class MutationActionList {
+   private final List actions;
+
+   public MutationActionList() {
+   this.actions = new ArrayList<>();
+   }
+
+   public List getActions() {
+   return this.actions;
+   }
+
+   /**
+* Create a new list of HBase {@link Mutation}s.
+*
+* @param rowKey row that the created {@link Mutation} list is applied 
to
+* @param writeToWAL enable WAL
+* @return a list of HBase {@link Mutation}s
+*/
+   public List newMutationList(byte[] rowKey, boolean 
writeToWAL) {
+   List mutations = new ArrayList<>();
+   Put put = null;
+   Increment increment = null;
+   Append append = null;
+   Delete delete = null;
+   boolean rowIsDeleted = false;
+   for (MutationAction action : actions) {
+   switch (action.getType()) {
+   case PUT:
+   if (put == null) {
+   put = new Put(rowKey);
+   mutations.add(put);
+   }
+   if (action.getTs() == -1) {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getValue());
+   } else {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getTs(), 
action.getValue());
+   }
+   break;
+
+   case INCREMENT:
+   if (increment == null) {
+   increment = new 
Increment(rowKey);
+   mutations.add(increment);
+   }
+   increment.addColumn(action.getFamily(), 
action.getQualifier(), action.getIncrement());
+   break;
+
+   case APPEND:
+   if (append == null) {
+   append = new Append(rowKey);
+   mutations.add(append);
+   }
+   append.add(action.getFamily(), 
action.getQualifier(), action.getValue());
+   break;
+
+   // If there are multiple DELETE_ROW actions, 
only the first one is served
+   case DELETE_ROW:
+   if (!rowIsDeleted) {
+ 

[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-15 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r74874857
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseSink.java
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A sink that writes its input to an HBase table.
+ * To create this sink you need to pass two arguments the name of the 
HBase table and {@link HBaseMapper}.
+ * A boolean field writeToWAL can also be set to enable or disable Write 
Ahead Log (WAL).
+ * HBase config files must be located in the classpath to create a 
connection to HBase cluster.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class HBaseSink extends RichSinkFunction {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseSink.class);
+
+   private transient HBaseClient client;
+   private String tableName;
+   private HBaseMapper mapper;
+   private boolean writeToWAL = true;
--- End diff --

My bad. I missed this. It is there as you said. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-14 Thread delding
Github user delding commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r74711908
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseClient.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A client class that serves to create connection and send data to HBase.
+ */
+class HBaseClient implements Closeable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseClient.class);
+
+   private Connection connection;
+   private Table table;
+
+   public HBaseClient(org.apache.hadoop.conf.Configuration hbConfig, 
String tableName) throws IOException {
+   connection = ConnectionFactory.createConnection(hbConfig);
+   table = connection.getTable(TableName.valueOf(tableName));
--- End diff --

You are right, I will do it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-14 Thread delding
Github user delding commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r74711889
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/MutationActionList.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *  This class represents a list of {@link MutationAction}s you will take 
when writing
+ *  an input value of {@link HBaseSink} to a row in a HBase table.
+ *  Each {@link MutationAction} can create an HBase {@link Mutation} 
operation type
+ *  including {@link Put}, {@link Increment}, {@link Append} and {@link 
Delete}.
+ */
+public class MutationActionList {
+   private final List actions;
+
+   public MutationActionList() {
+   this.actions = new ArrayList<>();
+   }
+
+   public List getActions() {
+   return this.actions;
+   }
+
+   /**
+* Create a new list of HBase {@link Mutation}s.
+*
+* @param rowKey row that the created {@link Mutation} list is applied 
to
+* @param writeToWAL enable WAL
+* @return a list of HBase {@link Mutation}s
+*/
+   public List newMutationList(byte[] rowKey, boolean 
writeToWAL) {
+   List mutations = new ArrayList<>();
+   Put put = null;
+   Increment increment = null;
+   Append append = null;
+   Delete delete = null;
+   boolean rowIsDeleted = false;
+   for (MutationAction action : actions) {
+   switch (action.getType()) {
+   case PUT:
+   if (put == null) {
+   put = new Put(rowKey);
+   mutations.add(put);
+   }
+   if (action.getTs() == -1) {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getValue());
+   } else {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getTs(), 
action.getValue());
+   }
+   break;
+
+   case INCREMENT:
+   if (increment == null) {
+   increment = new 
Increment(rowKey);
+   mutations.add(increment);
+   }
+   increment.addColumn(action.getFamily(), 
action.getQualifier(), action.getIncrement());
+   break;
+
+   case APPEND:
+   if (append == null) {
+   append = new Append(rowKey);
+   mutations.add(append);
+   }
+   append.add(action.getFamily(), 
action.getQualifier(), action.getValue());
+   break;
+
+   // If there are multiple DELETE_ROW actions, 
only the first one is served
+   case DELETE_ROW:
+   if (!rowIsDeleted) {
+

[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-14 Thread delding
Github user delding commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r74711637
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/MutationActionList.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *  This class represents a list of {@link MutationAction}s you will take 
when writing
+ *  an input value of {@link HBaseSink} to a row in a HBase table.
+ *  Each {@link MutationAction} can create an HBase {@link Mutation} 
operation type
+ *  including {@link Put}, {@link Increment}, {@link Append} and {@link 
Delete}.
+ */
+public class MutationActionList {
+   private final List actions;
+
+   public MutationActionList() {
+   this.actions = new ArrayList<>();
+   }
+
+   public List getActions() {
+   return this.actions;
+   }
+
+   /**
+* Create a new list of HBase {@link Mutation}s.
+*
+* @param rowKey row that the created {@link Mutation} list is applied 
to
+* @param writeToWAL enable WAL
+* @return a list of HBase {@link Mutation}s
+*/
+   public List newMutationList(byte[] rowKey, boolean 
writeToWAL) {
+   List mutations = new ArrayList<>();
+   Put put = null;
+   Increment increment = null;
+   Append append = null;
+   Delete delete = null;
+   boolean rowIsDeleted = false;
+   for (MutationAction action : actions) {
+   switch (action.getType()) {
+   case PUT:
+   if (put == null) {
+   put = new Put(rowKey);
+   mutations.add(put);
+   }
+   if (action.getTs() == -1) {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getValue());
+   } else {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getTs(), 
action.getValue());
+   }
+   break;
+
+   case INCREMENT:
+   if (increment == null) {
+   increment = new 
Increment(rowKey);
+   mutations.add(increment);
+   }
+   increment.addColumn(action.getFamily(), 
action.getQualifier(), action.getIncrement());
+   break;
+
+   case APPEND:
+   if (append == null) {
+   append = new Append(rowKey);
+   mutations.add(append);
+   }
+   append.add(action.getFamily(), 
action.getQualifier(), action.getValue());
+   break;
+
+   // If there are multiple DELETE_ROW actions, 
only the first one is served
+   case DELETE_ROW:
+   if (!rowIsDeleted) {
+

[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-14 Thread delding
Github user delding commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r74711470
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/MutationActionList.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *  This class represents a list of {@link MutationAction}s you will take 
when writing
+ *  an input value of {@link HBaseSink} to a row in a HBase table.
+ *  Each {@link MutationAction} can create an HBase {@link Mutation} 
operation type
+ *  including {@link Put}, {@link Increment}, {@link Append} and {@link 
Delete}.
+ */
+public class MutationActionList {
+   private final List actions;
+
+   public MutationActionList() {
+   this.actions = new ArrayList<>();
+   }
+
+   public List getActions() {
+   return this.actions;
+   }
+
+   /**
+* Create a new list of HBase {@link Mutation}s.
+*
+* @param rowKey row that the created {@link Mutation} list is applied 
to
+* @param writeToWAL enable WAL
+* @return a list of HBase {@link Mutation}s
+*/
+   public List newMutationList(byte[] rowKey, boolean 
writeToWAL) {
+   List mutations = new ArrayList<>();
+   Put put = null;
+   Increment increment = null;
+   Append append = null;
+   Delete delete = null;
+   boolean rowIsDeleted = false;
+   for (MutationAction action : actions) {
+   switch (action.getType()) {
+   case PUT:
+   if (put == null) {
+   put = new Put(rowKey);
+   mutations.add(put);
+   }
+   if (action.getTs() == -1) {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getValue());
+   } else {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getTs(), 
action.getValue());
+   }
+   break;
+
+   case INCREMENT:
+   if (increment == null) {
+   increment = new 
Increment(rowKey);
+   mutations.add(increment);
+   }
+   increment.addColumn(action.getFamily(), 
action.getQualifier(), action.getIncrement());
+   break;
+
+   case APPEND:
+   if (append == null) {
+   append = new Append(rowKey);
+   mutations.add(append);
+   }
+   append.add(action.getFamily(), 
action.getQualifier(), action.getValue());
+   break;
+
+   // If there are multiple DELETE_ROW actions, 
only the first one is served
+   case DELETE_ROW:
+   if (!rowIsDeleted) {
+

[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-14 Thread delding
Github user delding commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r74711244
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/MutationActionList.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *  This class represents a list of {@link MutationAction}s you will take 
when writing
+ *  an input value of {@link HBaseSink} to a row in a HBase table.
+ *  Each {@link MutationAction} can create an HBase {@link Mutation} 
operation type
+ *  including {@link Put}, {@link Increment}, {@link Append} and {@link 
Delete}.
+ */
+public class MutationActionList {
+   private final List actions;
+
+   public MutationActionList() {
+   this.actions = new ArrayList<>();
+   }
+
+   public List getActions() {
+   return this.actions;
+   }
+
+   /**
+* Create a new list of HBase {@link Mutation}s.
+*
+* @param rowKey row that the created {@link Mutation} list is applied 
to
+* @param writeToWAL enable WAL
+* @return a list of HBase {@link Mutation}s
+*/
+   public List newMutationList(byte[] rowKey, boolean 
writeToWAL) {
+   List mutations = new ArrayList<>();
+   Put put = null;
+   Increment increment = null;
+   Append append = null;
+   Delete delete = null;
+   boolean rowIsDeleted = false;
+   for (MutationAction action : actions) {
+   switch (action.getType()) {
+   case PUT:
+   if (put == null) {
+   put = new Put(rowKey);
+   mutations.add(put);
+   }
+   if (action.getTs() == -1) {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getValue());
+   } else {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getTs(), 
action.getValue());
+   }
+   break;
+
+   case INCREMENT:
+   if (increment == null) {
+   increment = new 
Increment(rowKey);
+   mutations.add(increment);
+   }
+   increment.addColumn(action.getFamily(), 
action.getQualifier(), action.getIncrement());
+   break;
+
+   case APPEND:
+   if (append == null) {
+   append = new Append(rowKey);
+   mutations.add(append);
+   }
+   append.add(action.getFamily(), 
action.getQualifier(), action.getValue());
+   break;
+
+   // If there are multiple DELETE_ROW actions, 
only the first one is served
+   case DELETE_ROW:
+   if (!rowIsDeleted) {
+

[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-14 Thread delding
Github user delding commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r74711022
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/MutationActionList.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *  This class represents a list of {@link MutationAction}s you will take 
when writing
+ *  an input value of {@link HBaseSink} to a row in a HBase table.
+ *  Each {@link MutationAction} can create an HBase {@link Mutation} 
operation type
+ *  including {@link Put}, {@link Increment}, {@link Append} and {@link 
Delete}.
+ */
+public class MutationActionList {
+   private final List actions;
+
+   public MutationActionList() {
+   this.actions = new ArrayList<>();
+   }
+
+   public List getActions() {
+   return this.actions;
+   }
+
+   /**
+* Create a new list of HBase {@link Mutation}s.
+*
+* @param rowKey row that the created {@link Mutation} list is applied 
to
+* @param writeToWAL enable WAL
+* @return a list of HBase {@link Mutation}s
+*/
+   public List newMutationList(byte[] rowKey, boolean 
writeToWAL) {
+   List mutations = new ArrayList<>();
+   Put put = null;
+   Increment increment = null;
+   Append append = null;
+   Delete delete = null;
+   boolean rowIsDeleted = false;
+   for (MutationAction action : actions) {
+   switch (action.getType()) {
+   case PUT:
+   if (put == null) {
+   put = new Put(rowKey);
+   mutations.add(put);
+   }
+   if (action.getTs() == -1) {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getValue());
+   } else {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getTs(), 
action.getValue());
+   }
+   break;
+
+   case INCREMENT:
+   if (increment == null) {
+   increment = new 
Increment(rowKey);
+   mutations.add(increment);
+   }
+   increment.addColumn(action.getFamily(), 
action.getQualifier(), action.getIncrement());
+   break;
+
+   case APPEND:
+   if (append == null) {
+   append = new Append(rowKey);
+   mutations.add(append);
+   }
+   append.add(action.getFamily(), 
action.getQualifier(), action.getValue());
+   break;
+
+   // If there are multiple DELETE_ROW actions, 
only the first one is served
+   case DELETE_ROW:
+   if (!rowIsDeleted) {
+

[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-14 Thread delding
Github user delding commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r74710964
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/MutationActionList.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *  This class represents a list of {@link MutationAction}s you will take 
when writing
+ *  an input value of {@link HBaseSink} to a row in a HBase table.
+ *  Each {@link MutationAction} can create an HBase {@link Mutation} 
operation type
+ *  including {@link Put}, {@link Increment}, {@link Append} and {@link 
Delete}.
+ */
+public class MutationActionList {
+   private final List actions;
+
+   public MutationActionList() {
+   this.actions = new ArrayList<>();
+   }
+
+   public List getActions() {
+   return this.actions;
+   }
+
+   /**
+* Create a new list of HBase {@link Mutation}s.
+*
+* @param rowKey row that the created {@link Mutation} list is applied 
to
+* @param writeToWAL enable WAL
+* @return a list of HBase {@link Mutation}s
+*/
+   public List newMutationList(byte[] rowKey, boolean 
writeToWAL) {
+   List mutations = new ArrayList<>();
+   Put put = null;
+   Increment increment = null;
+   Append append = null;
+   Delete delete = null;
+   boolean rowIsDeleted = false;
+   for (MutationAction action : actions) {
+   switch (action.getType()) {
+   case PUT:
+   if (put == null) {
+   put = new Put(rowKey);
+   mutations.add(put);
+   }
+   if (action.getTs() == -1) {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getValue());
+   } else {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getTs(), 
action.getValue());
+   }
+   break;
+
+   case INCREMENT:
+   if (increment == null) {
+   increment = new 
Increment(rowKey);
+   mutations.add(increment);
+   }
+   increment.addColumn(action.getFamily(), 
action.getQualifier(), action.getIncrement());
+   break;
+
+   case APPEND:
+   if (append == null) {
+   append = new Append(rowKey);
+   mutations.add(append);
+   }
+   append.add(action.getFamily(), 
action.getQualifier(), action.getValue());
+   break;
+
+   // If there are multiple DELETE_ROW actions, 
only the first one is served
+   case DELETE_ROW:
+   if (!rowIsDeleted) {
+

[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-12 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r74549599
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/MutationActionList.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *  This class represents a list of {@link MutationAction}s you will take 
when writing
+ *  an input value of {@link HBaseSink} to a row in a HBase table.
+ *  Each {@link MutationAction} can create an HBase {@link Mutation} 
operation type
+ *  including {@link Put}, {@link Increment}, {@link Append} and {@link 
Delete}.
+ */
+public class MutationActionList {
+   private final List actions;
+
+   public MutationActionList() {
+   this.actions = new ArrayList<>();
+   }
+
+   public List getActions() {
+   return this.actions;
+   }
+
+   /**
+* Create a new list of HBase {@link Mutation}s.
+*
+* @param rowKey row that the created {@link Mutation} list is applied 
to
+* @param writeToWAL enable WAL
+* @return a list of HBase {@link Mutation}s
+*/
+   public List newMutationList(byte[] rowKey, boolean 
writeToWAL) {
+   List mutations = new ArrayList<>();
+   Put put = null;
+   Increment increment = null;
+   Append append = null;
+   Delete delete = null;
+   boolean rowIsDeleted = false;
+   for (MutationAction action : actions) {
+   switch (action.getType()) {
+   case PUT:
+   if (put == null) {
+   put = new Put(rowKey);
+   mutations.add(put);
+   }
+   if (action.getTs() == -1) {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getValue());
+   } else {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getTs(), 
action.getValue());
+   }
+   break;
+
+   case INCREMENT:
+   if (increment == null) {
+   increment = new 
Increment(rowKey);
+   mutations.add(increment);
+   }
+   increment.addColumn(action.getFamily(), 
action.getQualifier(), action.getIncrement());
+   break;
+
+   case APPEND:
+   if (append == null) {
+   append = new Append(rowKey);
+   mutations.add(append);
+   }
+   append.add(action.getFamily(), 
action.getQualifier(), action.getValue());
+   break;
+
+   // If there are multiple DELETE_ROW actions, 
only the first one is served
+   case DELETE_ROW:
+   if (!rowIsDeleted) {
+ 

[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-12 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r74551046
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseClient.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A client class that serves to create connection and send data to HBase.
+ */
+class HBaseClient implements Closeable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseClient.class);
+
+   private Connection connection;
+   private Table table;
+
+   public HBaseClient(org.apache.hadoop.conf.Configuration hbConfig, 
String tableName) throws IOException {
+   connection = ConnectionFactory.createConnection(hbConfig);
+   table = connection.getTable(TableName.valueOf(tableName));
--- End diff --

Have you tested when there is no table available with that name? May be you 
need to have connect() API like in CassandraSink so that if that fails then 
there is no point in continuing with others and gives you a clear API 
specifically for connecting. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-12 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r74550914
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/MutationActionList.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *  This class represents a list of {@link MutationAction}s you will take 
when writing
+ *  an input value of {@link HBaseSink} to a row in a HBase table.
+ *  Each {@link MutationAction} can create an HBase {@link Mutation} 
operation type
+ *  including {@link Put}, {@link Increment}, {@link Append} and {@link 
Delete}.
+ */
+public class MutationActionList {
+   private final List actions;
+
+   public MutationActionList() {
+   this.actions = new ArrayList<>();
+   }
+
+   public List getActions() {
+   return this.actions;
+   }
+
+   /**
+* Create a new list of HBase {@link Mutation}s.
+*
+* @param rowKey row that the created {@link Mutation} list is applied 
to
+* @param writeToWAL enable WAL
+* @return a list of HBase {@link Mutation}s
+*/
+   public List newMutationList(byte[] rowKey, boolean 
writeToWAL) {
+   List mutations = new ArrayList<>();
+   Put put = null;
+   Increment increment = null;
+   Append append = null;
+   Delete delete = null;
+   boolean rowIsDeleted = false;
+   for (MutationAction action : actions) {
+   switch (action.getType()) {
+   case PUT:
+   if (put == null) {
+   put = new Put(rowKey);
+   mutations.add(put);
+   }
+   if (action.getTs() == -1) {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getValue());
+   } else {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getTs(), 
action.getValue());
+   }
+   break;
+
+   case INCREMENT:
+   if (increment == null) {
+   increment = new 
Increment(rowKey);
+   mutations.add(increment);
+   }
+   increment.addColumn(action.getFamily(), 
action.getQualifier(), action.getIncrement());
+   break;
+
+   case APPEND:
+   if (append == null) {
+   append = new Append(rowKey);
+   mutations.add(append);
+   }
+   append.add(action.getFamily(), 
action.getQualifier(), action.getValue());
+   break;
+
+   // If there are multiple DELETE_ROW actions, 
only the first one is served
+   case DELETE_ROW:
+   if (!rowIsDeleted) {
+ 

[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-12 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r74550400
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/MutationActionList.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *  This class represents a list of {@link MutationAction}s you will take 
when writing
+ *  an input value of {@link HBaseSink} to a row in a HBase table.
+ *  Each {@link MutationAction} can create an HBase {@link Mutation} 
operation type
+ *  including {@link Put}, {@link Increment}, {@link Append} and {@link 
Delete}.
+ */
+public class MutationActionList {
+   private final List actions;
+
+   public MutationActionList() {
+   this.actions = new ArrayList<>();
+   }
+
+   public List getActions() {
+   return this.actions;
+   }
+
+   /**
+* Create a new list of HBase {@link Mutation}s.
+*
+* @param rowKey row that the created {@link Mutation} list is applied 
to
+* @param writeToWAL enable WAL
+* @return a list of HBase {@link Mutation}s
+*/
+   public List newMutationList(byte[] rowKey, boolean 
writeToWAL) {
+   List mutations = new ArrayList<>();
+   Put put = null;
+   Increment increment = null;
+   Append append = null;
+   Delete delete = null;
+   boolean rowIsDeleted = false;
+   for (MutationAction action : actions) {
+   switch (action.getType()) {
+   case PUT:
+   if (put == null) {
+   put = new Put(rowKey);
+   mutations.add(put);
+   }
+   if (action.getTs() == -1) {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getValue());
+   } else {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getTs(), 
action.getValue());
+   }
+   break;
+
+   case INCREMENT:
+   if (increment == null) {
+   increment = new 
Increment(rowKey);
+   mutations.add(increment);
+   }
+   increment.addColumn(action.getFamily(), 
action.getQualifier(), action.getIncrement());
+   break;
+
+   case APPEND:
+   if (append == null) {
+   append = new Append(rowKey);
+   mutations.add(append);
+   }
+   append.add(action.getFamily(), 
action.getQualifier(), action.getValue());
+   break;
+
+   // If there are multiple DELETE_ROW actions, 
only the first one is served
+   case DELETE_ROW:
+   if (!rowIsDeleted) {
+ 

[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-12 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r74550242
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/MutationActionList.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *  This class represents a list of {@link MutationAction}s you will take 
when writing
+ *  an input value of {@link HBaseSink} to a row in a HBase table.
+ *  Each {@link MutationAction} can create an HBase {@link Mutation} 
operation type
+ *  including {@link Put}, {@link Increment}, {@link Append} and {@link 
Delete}.
+ */
+public class MutationActionList {
+   private final List actions;
+
+   public MutationActionList() {
+   this.actions = new ArrayList<>();
+   }
+
+   public List getActions() {
+   return this.actions;
+   }
+
+   /**
+* Create a new list of HBase {@link Mutation}s.
+*
+* @param rowKey row that the created {@link Mutation} list is applied 
to
+* @param writeToWAL enable WAL
+* @return a list of HBase {@link Mutation}s
+*/
+   public List newMutationList(byte[] rowKey, boolean 
writeToWAL) {
+   List mutations = new ArrayList<>();
+   Put put = null;
+   Increment increment = null;
+   Append append = null;
+   Delete delete = null;
+   boolean rowIsDeleted = false;
+   for (MutationAction action : actions) {
+   switch (action.getType()) {
+   case PUT:
+   if (put == null) {
+   put = new Put(rowKey);
+   mutations.add(put);
+   }
+   if (action.getTs() == -1) {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getValue());
+   } else {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getTs(), 
action.getValue());
+   }
+   break;
+
+   case INCREMENT:
+   if (increment == null) {
+   increment = new 
Increment(rowKey);
+   mutations.add(increment);
+   }
+   increment.addColumn(action.getFamily(), 
action.getQualifier(), action.getIncrement());
+   break;
+
+   case APPEND:
+   if (append == null) {
+   append = new Append(rowKey);
+   mutations.add(append);
+   }
+   append.add(action.getFamily(), 
action.getQualifier(), action.getValue());
+   break;
+
+   // If there are multiple DELETE_ROW actions, 
only the first one is served
+   case DELETE_ROW:
+   if (!rowIsDeleted) {
+ 

[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-12 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r74549919
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/MutationActionList.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *  This class represents a list of {@link MutationAction}s you will take 
when writing
+ *  an input value of {@link HBaseSink} to a row in a HBase table.
+ *  Each {@link MutationAction} can create an HBase {@link Mutation} 
operation type
+ *  including {@link Put}, {@link Increment}, {@link Append} and {@link 
Delete}.
+ */
+public class MutationActionList {
+   private final List actions;
+
+   public MutationActionList() {
+   this.actions = new ArrayList<>();
+   }
+
+   public List getActions() {
+   return this.actions;
+   }
+
+   /**
+* Create a new list of HBase {@link Mutation}s.
+*
+* @param rowKey row that the created {@link Mutation} list is applied 
to
+* @param writeToWAL enable WAL
+* @return a list of HBase {@link Mutation}s
+*/
+   public List newMutationList(byte[] rowKey, boolean 
writeToWAL) {
+   List mutations = new ArrayList<>();
+   Put put = null;
+   Increment increment = null;
+   Append append = null;
+   Delete delete = null;
+   boolean rowIsDeleted = false;
+   for (MutationAction action : actions) {
+   switch (action.getType()) {
+   case PUT:
+   if (put == null) {
+   put = new Put(rowKey);
+   mutations.add(put);
+   }
+   if (action.getTs() == -1) {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getValue());
+   } else {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getTs(), 
action.getValue());
+   }
+   break;
+
+   case INCREMENT:
+   if (increment == null) {
+   increment = new 
Increment(rowKey);
+   mutations.add(increment);
+   }
+   increment.addColumn(action.getFamily(), 
action.getQualifier(), action.getIncrement());
+   break;
+
+   case APPEND:
+   if (append == null) {
+   append = new Append(rowKey);
+   mutations.add(append);
+   }
+   append.add(action.getFamily(), 
action.getQualifier(), action.getValue());
+   break;
+
+   // If there are multiple DELETE_ROW actions, 
only the first one is served
+   case DELETE_ROW:
+   if (!rowIsDeleted) {
+ 

[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-12 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r74549521
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/MutationActionList.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *  This class represents a list of {@link MutationAction}s you will take 
when writing
+ *  an input value of {@link HBaseSink} to a row in a HBase table.
+ *  Each {@link MutationAction} can create an HBase {@link Mutation} 
operation type
+ *  including {@link Put}, {@link Increment}, {@link Append} and {@link 
Delete}.
+ */
+public class MutationActionList {
+   private final List actions;
+
+   public MutationActionList() {
+   this.actions = new ArrayList<>();
+   }
+
+   public List getActions() {
+   return this.actions;
+   }
+
+   /**
+* Create a new list of HBase {@link Mutation}s.
+*
+* @param rowKey row that the created {@link Mutation} list is applied 
to
+* @param writeToWAL enable WAL
+* @return a list of HBase {@link Mutation}s
+*/
+   public List newMutationList(byte[] rowKey, boolean 
writeToWAL) {
+   List mutations = new ArrayList<>();
+   Put put = null;
+   Increment increment = null;
+   Append append = null;
+   Delete delete = null;
+   boolean rowIsDeleted = false;
+   for (MutationAction action : actions) {
+   switch (action.getType()) {
+   case PUT:
+   if (put == null) {
+   put = new Put(rowKey);
+   mutations.add(put);
+   }
+   if (action.getTs() == -1) {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getValue());
+   } else {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getTs(), 
action.getValue());
+   }
+   break;
+
+   case INCREMENT:
+   if (increment == null) {
+   increment = new 
Increment(rowKey);
+   mutations.add(increment);
+   }
+   increment.addColumn(action.getFamily(), 
action.getQualifier(), action.getIncrement());
+   break;
+
+   case APPEND:
+   if (append == null) {
+   append = new Append(rowKey);
+   mutations.add(append);
+   }
+   append.add(action.getFamily(), 
action.getQualifier(), action.getValue());
+   break;
+
+   // If there are multiple DELETE_ROW actions, 
only the first one is served
+   case DELETE_ROW:
+   if (!rowIsDeleted) {
+ 

[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-12 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r74549333
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/MutationActionList.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *  This class represents a list of {@link MutationAction}s you will take 
when writing
+ *  an input value of {@link HBaseSink} to a row in a HBase table.
+ *  Each {@link MutationAction} can create an HBase {@link Mutation} 
operation type
+ *  including {@link Put}, {@link Increment}, {@link Append} and {@link 
Delete}.
+ */
+public class MutationActionList {
+   private final List actions;
+
+   public MutationActionList() {
+   this.actions = new ArrayList<>();
+   }
+
+   public List getActions() {
+   return this.actions;
+   }
+
+   /**
+* Create a new list of HBase {@link Mutation}s.
+*
+* @param rowKey row that the created {@link Mutation} list is applied 
to
+* @param writeToWAL enable WAL
+* @return a list of HBase {@link Mutation}s
+*/
+   public List newMutationList(byte[] rowKey, boolean 
writeToWAL) {
+   List mutations = new ArrayList<>();
+   Put put = null;
+   Increment increment = null;
+   Append append = null;
+   Delete delete = null;
+   boolean rowIsDeleted = false;
+   for (MutationAction action : actions) {
+   switch (action.getType()) {
+   case PUT:
+   if (put == null) {
+   put = new Put(rowKey);
+   mutations.add(put);
+   }
+   if (action.getTs() == -1) {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getValue());
+   } else {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getTs(), 
action.getValue());
+   }
+   break;
+
+   case INCREMENT:
+   if (increment == null) {
+   increment = new 
Increment(rowKey);
+   mutations.add(increment);
+   }
+   increment.addColumn(action.getFamily(), 
action.getQualifier(), action.getIncrement());
+   break;
+
+   case APPEND:
+   if (append == null) {
+   append = new Append(rowKey);
+   mutations.add(append);
+   }
+   append.add(action.getFamily(), 
action.getQualifier(), action.getValue());
+   break;
+
+   // If there are multiple DELETE_ROW actions, 
only the first one is served
--- End diff --

May be take the last one rather than the first one.


---
If your project is set up for it, you can reply to this 

[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-08 Thread delding
Github user delding commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r73939778
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseSink.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class HBaseSink extends RichSinkFunction {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseSink.class);
+
+   private transient HBaseClient client;
+   private String tableName;
+   private HBaseMapper mapper;
+   private boolean writeToWAL = true;
+
+   public HBaseSink(String tableName, HBaseMapper mapper) {
+   Preconditions.checkArgument(!Strings.isNullOrEmpty(tableName), 
"Table name cannot be null or empty");
+   Preconditions.checkArgument(mapper != null, "HBase mapper 
cannot be null");
+   this.tableName = tableName;
+   this.mapper = mapper;
+   }
+
+   public HBaseSink writeToWAL(boolean writeToWAL) {
+   this.writeToWAL = writeToWAL;
+   return this;
+   }
+
+   @Override
+   public void open(Configuration configuration) throws Exception {
+   try {
+   // use config files found in the classpath
+   client = new HBaseClient(HBaseConfiguration.create(), 
tableName);
+   } catch (IOException e) {
+   throw new RuntimeException("HBase sink preparation 
failed.", e);
+   }
+   }
+
+   @Override
+   public void invoke(IN value) throws Exception {
+   byte[] rowKey = mapper.rowKey(value);
+   List mutations = Lists.newArrayList();
+   for (HBaseMapper.HBaseColumn column : mapper.columns(value)) {
+   Mutation mutation;
+   if (column.isStandard()) {
--- End diff --

Thanks for these helpful comments :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-07 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r73821038
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseSink.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class HBaseSink extends RichSinkFunction {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseSink.class);
+
+   private transient HBaseClient client;
+   private String tableName;
+   private HBaseMapper mapper;
+   private boolean writeToWAL = true;
+
+   public HBaseSink(String tableName, HBaseMapper mapper) {
+   Preconditions.checkArgument(!Strings.isNullOrEmpty(tableName), 
"Table name cannot be null or empty");
+   Preconditions.checkArgument(mapper != null, "HBase mapper 
cannot be null");
+   this.tableName = tableName;
+   this.mapper = mapper;
+   }
+
+   public HBaseSink writeToWAL(boolean writeToWAL) {
+   this.writeToWAL = writeToWAL;
+   return this;
+   }
+
+   @Override
+   public void open(Configuration configuration) throws Exception {
+   try {
+   // use config files found in the classpath
+   client = new HBaseClient(HBaseConfiguration.create(), 
tableName);
+   } catch (IOException e) {
+   throw new RuntimeException("HBase sink preparation 
failed.", e);
+   }
+   }
+
+   @Override
+   public void invoke(IN value) throws Exception {
+   byte[] rowKey = mapper.rowKey(value);
+   List mutations = Lists.newArrayList();
+   for (HBaseMapper.HBaseColumn column : mapper.columns(value)) {
+   Mutation mutation;
+   if (column.isStandard()) {
--- End diff --

Better to created Enums for PUT, DELETE, INCREMENT and APPEND and handle 
all of them here? so that Table#batch() will work for all. But Deletes are much 
more complicated because there are ROW deletes, COL deletes , family deletes, 
deletes with specific versions. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-07 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r73820969
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseSink.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class HBaseSink extends RichSinkFunction {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseSink.class);
+
+   private transient HBaseClient client;
+   private String tableName;
+   private HBaseMapper mapper;
+   private boolean writeToWAL = true;
+
+   public HBaseSink(String tableName, HBaseMapper mapper) {
+   Preconditions.checkArgument(!Strings.isNullOrEmpty(tableName), 
"Table name cannot be null or empty");
+   Preconditions.checkArgument(mapper != null, "HBase mapper 
cannot be null");
+   this.tableName = tableName;
+   this.mapper = mapper;
+   }
+
+   public HBaseSink writeToWAL(boolean writeToWAL) {
+   this.writeToWAL = writeToWAL;
+   return this;
+   }
+
+   @Override
+   public void open(Configuration configuration) throws Exception {
+   try {
+   // use config files found in the classpath
+   client = new HBaseClient(HBaseConfiguration.create(), 
tableName);
+   } catch (IOException e) {
+   throw new RuntimeException("HBase sink preparation 
failed.", e);
+   }
+   }
+
+   @Override
+   public void invoke(IN value) throws Exception {
+   byte[] rowKey = mapper.rowKey(value);
+   List mutations = Lists.newArrayList();
+   for (HBaseMapper.HBaseColumn column : mapper.columns(value)) {
+   Mutation mutation;
+   if (column.isStandard()) {
+   mutation = new Put(rowKey);
+   if (column.getTs() == -1) {
+   ((Put) 
mutation).addColumn(column.getFamily(), column.getQualifier(), 
column.getValue());
--- End diff --

So the -1 is handled here. So ignore that comment for ts set to -1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-06 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r73788575
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Serializable {
+
+   /**
+* Given a input value return the HBase row key.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
+
+   /**
+* Given a input value return a list of HBase columns.
+*
+* @param value
+* @return a list of HBase columns
+*/
+   List columns(IN value);
+
+   /**
+*  Represents a HBase column which can be either a standard one or a 
counter.
+*/
+   class HBaseColumn {
+   private byte[] family;
+   private byte[] qualifier;
+   private byte[] value;
+   private long timestamp;
+   private Long increment;
+
+   public HBaseColumn(byte[] family, byte[] qualifier, byte[] 
value) {
+   this(family, qualifier, value, -1);
+   }
+
+   public HBaseColumn(byte[] family, byte[] qualifier, byte[] 
value, long timestamp) {
+   this.family = family;
+   this.qualifier = qualifier;
+   this.value = value;
+   this.timestamp = timestamp;
+   }
+
+   public HBaseColumn(byte[] family, byte[] qualifier, long 
increment) {
+   this.family = family;
+   this.qualifier = qualifier;
+   this.increment = increment;
+   }
+
+   public byte[] getFamily() {
+   return family;
+   }
+
+   public byte[] getQualifier() {
+   return qualifier;
+   }
+
+   public byte[] getValue() {
+   return value;
+   }
+
+   public long getTs() {
+   return timestamp;
+   }
+
+   public Long getIncrement() {
+   return increment;
--- End diff --

The reason to say this was even Append is also a Mutation. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-06 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r73788502
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Serializable {
+
+   /**
+* Given a input value return the HBase row key.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
+
+   /**
+* Given a input value return a list of HBase columns.
+*
+* @param value
+* @return a list of HBase columns
+*/
+   List columns(IN value);
+
+   /**
+*  Represents a HBase column which can be either a standard one or a 
counter.
+*/
+   class HBaseColumn {
+   private byte[] family;
+   private byte[] qualifier;
+   private byte[] value;
+   private long timestamp;
+   private Long increment;
+
+   public HBaseColumn(byte[] family, byte[] qualifier, byte[] 
value) {
+   this(family, qualifier, value, -1);
+   }
+
+   public HBaseColumn(byte[] family, byte[] qualifier, byte[] 
value, long timestamp) {
+   this.family = family;
+   this.qualifier = qualifier;
+   this.value = value;
+   this.timestamp = timestamp;
+   }
+
+   public HBaseColumn(byte[] family, byte[] qualifier, long 
increment) {
+   this.family = family;
+   this.qualifier = qualifier;
+   this.increment = increment;
+   }
+
+   public byte[] getFamily() {
+   return family;
+   }
+
+   public byte[] getQualifier() {
+   return qualifier;
+   }
+
+   public byte[] getValue() {
+   return value;
+   }
+
+   public long getTs() {
+   return timestamp;
+   }
+
+   public Long getIncrement() {
+   return increment;
--- End diff --

May be we can add an API explicity for increment?  Instead of having a 
boolean to identify increment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-06 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r73788475
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Serializable {
+
+   /**
+* Given a input value return the HBase row key.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
+
+   /**
+* Given a input value return a list of HBase columns.
+*
+* @param value
+* @return a list of HBase columns
+*/
+   List columns(IN value);
+
+   /**
+*  Represents a HBase column which can be either a standard one or a 
counter.
+*/
+   class HBaseColumn {
+   private byte[] family;
+   private byte[] qualifier;
+   private byte[] value;
+   private long timestamp;
+   private Long increment;
+
+   public HBaseColumn(byte[] family, byte[] qualifier, byte[] 
value) {
+   this(family, qualifier, value, -1);
--- End diff --

Should this be -1? Ideally if there is no ts HBase will assign the server 
based timestamp.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-05 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r73779379
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseSink.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
--- End diff --

Can you try removing the Guava dependencies? In Flink we try to avoid using 
Guava because of dependency issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-05 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r73779331
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseClient.java
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+public class HBaseClient implements Closeable {
--- End diff --

Can you also add a simple Javadoc for this class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-05 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r73779327
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Serializable {
+
+   /**
+* Given a input value return the HBase row key.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
+
+   /**
+* Given a input value return a list of HBase columns.
+*
+* @param value
+* @return a list of HBase columns
+*/
+   List columns(IN value);
+
+   /**
+*  Represents a HBase column which can be either a standard one or a 
counter.
+*/
+   class HBaseColumn {
+   private byte[] family;
+   private byte[] qualifier;
+   private byte[] value;
+   private long timestamp;
+   private Long increment;
--- End diff --

I would make these `final`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-05 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r73779299
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseSink.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class HBaseSink extends RichSinkFunction {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseSink.class);
+
+   private transient HBaseClient client;
+   private String tableName;
+   private HBaseMapper mapper;
+   private boolean writeToWAL = true;
+
+   public HBaseSink(String tableName, HBaseMapper mapper) {
+   Preconditions.checkArgument(!Strings.isNullOrEmpty(tableName), 
"Table name cannot be null or empty");
+   Preconditions.checkArgument(mapper != null, "HBase mapper 
cannot be null");
+   this.tableName = tableName;
+   this.mapper = mapper;
+   }
+
+   public HBaseSink writeToWAL(boolean writeToWAL) {
--- End diff --

Again, Javadoc for public facing interfaces.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-05 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r73779290
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseSink.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class HBaseSink extends RichSinkFunction {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseSink.class);
+
+   private transient HBaseClient client;
+   private String tableName;
+   private HBaseMapper mapper;
+   private boolean writeToWAL = true;
+
+   public HBaseSink(String tableName, HBaseMapper mapper) {
--- End diff --

Will need Javadocs for the constructor too :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-05 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r73779255
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseSink.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class HBaseSink extends RichSinkFunction {
--- End diff --

Since this is public facing, we should have Javadocs for this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-05 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r73779190
  
--- Diff: flink-streaming-connectors/flink-connector-hbase/pom.xml ---
@@ -0,0 +1,68 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+4.0.0
+
+
+org.apache.flink
+flink-streaming-connectors
+1.1-SNAPSHOT
--- End diff --

We've recently just bumped version to 1.2-SNAPSHOT.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-04 Thread delding
GitHub user delding opened a pull request:

https://github.com/apache/flink/pull/2332

[FLINK-2055] Implement Streaming HBaseSink

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/delding/flink FLINK-2055

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2332.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2332


commit 2a9e86943dbb98a5ea5ed60495fc4acede900ddb
Author: erli ding 
Date:   2016-08-03T00:57:19Z

[FLINK-2055] Implement Streaming HBaseSink




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---