http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
 
b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
new file mode 100644
index 0000000..1801c3e
--- /dev/null
+++ 
b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class HadoopReduceFunctionITCase extends MultipleProgramsTestBase {
+
+       public HadoopReduceFunctionITCase(ExecutionMode mode){
+               super(mode);
+       }
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Test
+       public void testStandardGrouping() throws Exception{
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env).
+                               map(new Mapper1());
+
+               DataSet<Tuple2<IntWritable, IntWritable>> commentCnts = ds.
+                               groupBy(0).
+                               reduceGroup(new 
HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(new 
CommentCntReducer()));
+
+               String resultPath = tempFolder.newFile().toURI().toString();
+
+               commentCnts.writeAsText(resultPath);
+               env.execute();
+
+               String expected = "(0,0)\n"+
+                               "(1,3)\n" +
+                               "(2,5)\n" +
+                               "(3,5)\n" +
+                               "(4,2)\n";
+
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       @Test
+       public void testUngroupedHadoopReducer() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env);
+
+               DataSet<Tuple2<IntWritable, IntWritable>> commentCnts = ds.
+                               reduceGroup(new 
HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(new 
AllCommentCntReducer()));
+
+               String resultPath = tempFolder.newFile().toURI().toString();
+
+               commentCnts.writeAsText(resultPath);
+               env.execute();
+
+               String expected = "(42,15)\n";
+
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       @Test
+       public void testConfigurationViaJobConf() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               JobConf conf = new JobConf();
+               conf.set("my.cntPrefix", "Hello");
+
+               DataSet<Tuple2<IntWritable, Text>> ds = 
HadoopTestData.getKVPairDataSet(env).
+                               map(new Mapper2());
+
+               DataSet<Tuple2<IntWritable, IntWritable>> helloCnts = ds.
+                               groupBy(0).
+                               reduceGroup(new 
HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(
+                                               new ConfigurableCntReducer(), 
conf));
+
+               String resultPath = tempFolder.newFile().toURI().toString();
+
+               helloCnts.writeAsText(resultPath);
+               env.execute();
+
+               String expected = "(0,0)\n"+
+                               "(1,0)\n" +
+                               "(2,1)\n" +
+                               "(3,1)\n" +
+                               "(4,1)\n";
+
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+       
+       public static class CommentCntReducer implements Reducer<IntWritable, 
Text, IntWritable, IntWritable> {
+               
+               @Override
+               public void reduce(IntWritable k, Iterator<Text> vs, 
OutputCollector<IntWritable, IntWritable> out, Reporter r)
+                               throws IOException {
+                       int commentCnt = 0;
+                       while(vs.hasNext()) {
+                               String v = vs.next().toString();
+                               if(v.startsWith("Comment")) {
+                                       commentCnt++;
+                               }
+                       }
+                       out.collect(k, new IntWritable(commentCnt));
+               }
+               
+               @Override
+               public void configure(final JobConf arg0) { }
+
+               @Override
+               public void close() throws IOException { }
+       }
+       
+       public static class AllCommentCntReducer implements 
Reducer<IntWritable, Text, IntWritable, IntWritable> {
+               
+               @Override
+               public void reduce(IntWritable k, Iterator<Text> vs, 
OutputCollector<IntWritable, IntWritable> out, Reporter r)
+                               throws IOException {
+                       int commentCnt = 0;
+                       while(vs.hasNext()) {
+                               String v = vs.next().toString();
+                               if(v.startsWith("Comment")) {
+                                       commentCnt++;
+                               }
+                       }
+                       out.collect(new IntWritable(42), new 
IntWritable(commentCnt));
+               }
+               
+               @Override
+               public void configure(final JobConf arg0) { }
+
+               @Override
+               public void close() throws IOException { }
+       }
+       
+       public static class ConfigurableCntReducer implements 
Reducer<IntWritable, Text, IntWritable, IntWritable> {
+               private String countPrefix;
+               
+               @Override
+               public void reduce(IntWritable k, Iterator<Text> vs, 
OutputCollector<IntWritable, IntWritable> out, Reporter r)
+                               throws IOException {
+                       int commentCnt = 0;
+                       while(vs.hasNext()) {
+                               String v = vs.next().toString();
+                               if(v.startsWith(this.countPrefix)) {
+                                       commentCnt++;
+                               }
+                       }
+                       out.collect(k, new IntWritable(commentCnt));
+               }
+               
+               @Override
+               public void configure(final JobConf c) { 
+                       this.countPrefix = c.get("my.cntPrefix");
+               }
+
+               @Override
+               public void close() throws IOException { }
+       }
+
+       public static class Mapper1 implements MapFunction<Tuple2<IntWritable, 
Text>, Tuple2<IntWritable, Text>> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> 
v)
+               throws Exception {
+                       v.f0 = new IntWritable(v.f0.get() / 5);
+                       return v;
+               }
+       }
+
+       public static class Mapper2 implements MapFunction<Tuple2<IntWritable, 
Text>, Tuple2<IntWritable, Text>> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> 
v)
+               throws Exception {
+                       v.f0 = new IntWritable(v.f0.get() % 5);
+                       return v;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
 
b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
new file mode 100644
index 0000000..eed6f8f
--- /dev/null
+++ 
b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+
+public class HadoopTestData {
+
+       public static DataSet<Tuple2<IntWritable, Text>> 
getKVPairDataSet(ExecutionEnvironment env) {
+               
+               List<Tuple2<IntWritable, Text>> data = new 
ArrayList<Tuple2<IntWritable, Text>>();
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(1),new 
Text("Hi")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(2),new 
Text("Hello")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(3),new 
Text("Hello world")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(4),new 
Text("Hello world, how are you?")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(5),new 
Text("I am fine.")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(6),new 
Text("Luke Skywalker")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(7),new 
Text("Comment#1")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(8),new 
Text("Comment#2")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(9),new 
Text("Comment#3")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(10),new 
Text("Comment#4")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(11),new 
Text("Comment#5")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(12),new 
Text("Comment#6")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(13),new 
Text("Comment#7")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(14),new 
Text("Comment#8")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(15),new 
Text("Comment#9")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(16),new 
Text("Comment#10")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(17),new 
Text("Comment#11")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(18),new 
Text("Comment#12")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(19),new 
Text("Comment#13")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(20),new 
Text("Comment#14")));
+               data.add(new Tuple2<IntWritable, Text>(new IntWritable(21),new 
Text("Comment#15")));
+               
+               Collections.shuffle(data);
+               
+               return env.fromCollection(data);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java
 
b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java
new file mode 100644
index 0000000..fe7ea8e
--- /dev/null
+++ 
b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.test.hadoopcompatibility.mapred.record;
+
+import org.apache.flink.api.common.Plan;
+import 
org.apache.flink.hadoopcompatibility.mapred.record.example.WordCountWithOutputFormat;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.RecordAPITestBase;
+
+/**
+ * test the hadoop inputformat and outputformat
+ */
+public class HadoopRecordInputOutputITCase extends RecordAPITestBase {
+       protected String textPath;
+       protected String resultPath;
+       protected String counts;
+
+       @Override
+       protected void preSubmit() throws Exception {
+               textPath = createTempFile("text.txt", WordCountData.TEXT);
+               resultPath = getTempDirPath("result");
+               counts = WordCountData.COUNTS.replaceAll(" ", "\t");
+       }
+
+       @Override
+       protected Plan getTestJob() {
+               //WordCountWithHadoopOutputFormat takes hadoop TextInputFormat 
as input and output file in hadoop TextOutputFormat
+               WordCountWithOutputFormat wc = new WordCountWithOutputFormat();
+               return wc.getPlan("1", textPath, resultPath);
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               // Test results, append /1 to resultPath due to the generated 
_temproray file.
+               compareResultsByLinesInMemory(counts, resultPath + "/1");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
 
b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
new file mode 100644
index 0000000..2592b88
--- /dev/null
+++ 
b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred.wrapper;
+
+import java.util.ArrayList;
+import java.util.NoSuchElementException;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
+import org.apache.hadoop.io.IntWritable;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class HadoopTupleUnwrappingIteratorTest {
+
+       @Test
+       public void testValueIterator() {
+               
+               HadoopTupleUnwrappingIterator<IntWritable, IntWritable> valIt = 
+                               new HadoopTupleUnwrappingIterator<IntWritable, 
IntWritable>(IntWritable.class);
+               
+               // many values
+               
+               ArrayList<Tuple2<IntWritable, IntWritable>> tList = new 
ArrayList<Tuple2<IntWritable, IntWritable>>();
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(1)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(2)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(3)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(4)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(5)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(6)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(7)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(1),new IntWritable(8)));
+               
+               int expectedKey = 1;
+               int[] expectedValues = new int[] {1,2,3,4,5,6,7,8};
+               
+               valIt.set(tList.iterator());
+               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+               for(int expectedValue : expectedValues) {
+                       Assert.assertTrue(valIt.hasNext());
+                       Assert.assertTrue(valIt.hasNext());
+                       Assert.assertTrue(valIt.next().get() == expectedValue);
+                       Assert.assertTrue(valIt.getCurrentKey().get() == 
expectedKey);
+               }
+               Assert.assertFalse(valIt.hasNext());
+               Assert.assertFalse(valIt.hasNext());
+               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+               
+               // one value
+               
+               tList.clear();
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(2),new IntWritable(10)));
+               
+               expectedKey = 2;
+               expectedValues = new int[]{10};
+               
+               valIt.set(tList.iterator());
+               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+               for(int expectedValue : expectedValues) {
+                       Assert.assertTrue(valIt.hasNext());
+                       Assert.assertTrue(valIt.hasNext());
+                       Assert.assertTrue(valIt.next().get() == expectedValue);
+                       Assert.assertTrue(valIt.getCurrentKey().get() == 
expectedKey);
+               }
+               Assert.assertFalse(valIt.hasNext());
+               Assert.assertFalse(valIt.hasNext());
+               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+               
+               // more values
+               
+               tList.clear();
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(3),new IntWritable(10)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(3),new IntWritable(4)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(3),new IntWritable(7)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(3),new IntWritable(9)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(4),new IntWritable(21)));
+               
+               expectedKey = 3;
+               expectedValues = new int[]{10,4,7,9,21};
+               
+               valIt.set(tList.iterator());
+               Assert.assertTrue(valIt.hasNext());
+               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+               for(int expectedValue : expectedValues) {
+                       Assert.assertTrue(valIt.hasNext());
+                       Assert.assertTrue(valIt.hasNext());
+                       Assert.assertTrue(valIt.next().get() == expectedValue);
+                       Assert.assertTrue(valIt.getCurrentKey().get() == 
expectedKey);
+               }
+               Assert.assertFalse(valIt.hasNext());
+               Assert.assertFalse(valIt.hasNext());
+               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+               
+               // no has next calls
+               
+               tList.clear();
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(4),new IntWritable(5)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(4),new IntWritable(8)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(4),new IntWritable(42)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(4),new IntWritable(-1)));
+               tList.add(new Tuple2<IntWritable, IntWritable>(new 
IntWritable(4),new IntWritable(0)));
+               
+               expectedKey = 4;
+               expectedValues = new int[]{5,8,42,-1,0};
+               
+               valIt.set(tList.iterator());
+               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+               for(int expectedValue : expectedValues) {
+                       Assert.assertTrue(valIt.next().get() == expectedValue);
+               }
+               try {
+                       valIt.next();
+                       Assert.fail();
+               } catch (NoSuchElementException nsee) {
+                       // expected
+               }
+               Assert.assertFalse(valIt.hasNext());
+               Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java
 
b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java
new file mode 100644
index 0000000..d79afaa
--- /dev/null
+++ 
b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.test.hadoopcompatibility.mapreduce;
+
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.fail;
+
+
+
+public class HadoopInputFormatTest {
+
+
+       public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, 
T> {
+
+               public DummyVoidKeyInputFormat() {
+               }
+
+               @Override
+               public RecordReader<Void, T> createRecordReader(InputSplit 
inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, 
InterruptedException {
+                       return null;
+               }
+       }
+       
+       
+       @Test
+       public void checkTypeInformation() {
+               try {
+                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                       // Set up the Hadoop Input Format
+                       Job job = Job.getInstance();
+                       HadoopInputFormat<Void, Long> hadoopInputFormat = new 
HadoopInputFormat<Void, Long>( new DummyVoidKeyInputFormat(), Void.class, 
Long.class, job);
+
+                       TypeInformation<Tuple2<Void,Long>> tupleType = 
hadoopInputFormat.getProducedType();
+                       TypeInformation<Tuple2<Void,Long>> testTupleType = new 
TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO);
+                       
+                       if(tupleType.isTupleType()) {
+                               
if(!((TupleTypeInfo)tupleType).equals(testTupleType)) {
+                                       fail("Tuple type information was not 
set correctly!");
+                               }
+                       } else {
+                               fail("Type information was not set to tuple 
type information!");
+                       }
+
+               }
+               catch (Exception ex) {
+                       fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
+               }
+
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
 
b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
new file mode 100644
index 0000000..7eee629
--- /dev/null
+++ 
b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapreduce;
+
+import org.apache.flink.hadoopcompatibility.mapreduce.example.WordCount;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class HadoopInputOutputITCase extends JavaProgramTestBase {
+       
+       protected String textPath;
+       protected String resultPath;
+       
+       
+       @Override
+       protected void preSubmit() throws Exception {
+               textPath = createTempFile("text.txt", WordCountData.TEXT);
+               resultPath = getTempDirPath("result");
+               this.setDegreeOfParallelism(4);
+       }
+       
+       @Override
+       protected void postSubmit() throws Exception {
+               compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, 
new String[]{".", "_"});
+       }
+       
+       @Override
+       protected void testProgram() throws Exception {
+               WordCount.main(new String[] { textPath, resultPath });
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
 
b/flink-staging/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..0b686e5
--- /dev/null
+++ 
b/flink-staging/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=OFF, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/test/resources/logback-test.xml 
b/flink-staging/flink-hadoop-compatibility/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8b3bb27
--- /dev/null
+++ 
b/flink-staging/flink-hadoop-compatibility/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hbase/pom.xml 
b/flink-staging/flink-hbase/pom.xml
new file mode 100644
index 0000000..223389c
--- /dev/null
+++ b/flink-staging/flink-hbase/pom.xml
@@ -0,0 +1,165 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-staging</artifactId>
+               <version>0.9-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-hbase</artifactId>
+       <name>flink-hbase</name>
+       <packaging>jar</packaging>
+
+       <properties>
+               <hbase.hadoop1.version>0.98.6.1-hadoop1</hbase.hadoop1.version>
+               <hbase.hadoop2.version>0.98.6.1-hadoop2</hbase.hadoop2.version>
+       </properties>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-java</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-clients</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       <artifactId>hadoop-core</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+               
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-hadoop-compatibility</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               
+               <dependency>
+                       <groupId>org.apache.hbase</groupId>
+                       <artifactId>hbase-client</artifactId>
+                       <version>${hbase.version}</version>
+                       <exclusions>
+                               <!-- Remove unneeded dependency, which is 
conflicting with our jetty-util version. -->
+                               <exclusion>
+                                       <groupId>org.mortbay.jetty</groupId>
+                                       <artifactId>jetty-util</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+       </dependencies>
+
+       <profiles>
+               <profile>
+                       <id>hadoop-1</id>
+                       <activation>
+                               <property>
+                                       <!-- Please do not remove the 'hadoop1' 
comment. See ./tools/generate_specific_pom.sh -->
+                                       
<!--hadoop1--><name>hadoop.profile</name><value>1</value>
+                               </property>
+                       </activation>
+                       <properties>
+                               
<hbase.version>${hbase.hadoop1.version}</hbase.version>
+                       </properties>
+                       <dependencies>
+                               <!-- Force hadoop-common dependency -->
+                               <dependency>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       <artifactId>hadoop-core</artifactId>
+                               </dependency>
+                       </dependencies>
+               </profile>
+               
+               <profile>
+                       <id>hadoop-2</id>
+                       <activation>
+                               <property>
+                                       <!-- Please do not remove the 'hadoop2' 
comment. See ./tools/generate_specific_pom.sh -->
+                                       
<!--hadoop2--><name>!hadoop.profile</name>
+                               </property>
+                       </activation>
+                       <properties>
+                               
<hbase.version>${hbase.hadoop2.version}</hbase.version>
+                       </properties>
+                       <dependencies>
+                               <!-- Force hadoop-common dependency -->
+                               <dependency>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       <artifactId>hadoop-common</artifactId>
+                               </dependency>
+                       </dependencies>
+               </profile>
+               
+               <profile>
+                       <id>cdh5.1.3</id>
+                       <properties>
+                               <hadoop.profile>2</hadoop.profile>
+                               <hbase.version>0.98.1-cdh5.1.3</hbase.version>
+                               <hadoop.version>2.3.0-cdh5.1.3</hadoop.version>
+                               <!-- Cloudera use different versions for hadoop 
core and commons-->
+                               <!-- This profile could be removed if Cloudera 
fix this mismatch! -->
+                               
<hadoop.core.version>2.3.0-mr1-cdh5.1.3</hadoop.core.version>
+                       </properties>
+                       <dependencyManagement>
+                               <dependencies>
+                                       <dependency>
+                                               
<groupId>org.apache.hadoop</groupId>
+                                               
<artifactId>hadoop-core</artifactId>
+                                               
<version>${hadoop.core.version}</version>
+                                       </dependency>
+                               </dependencies>
+                       </dependencyManagement>
+                       <dependencies>
+                               <!-- Force hadoop-common dependency -->
+                               <dependency>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       <artifactId>hadoop-common</artifactId>
+                                       <version>${hadoop.version}</version>
+                               </dependency>
+                               <dependency>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       
<artifactId>hadoop-mapreduce-client-core</artifactId>
+                                       <version>${hadoop.version}</version>
+                               </dependency>
+                       </dependencies>
+               </profile>
+
+       </profiles>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
 
b/flink-staging/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
new file mode 100755
index 0000000..9c861ed
--- /dev/null
+++ 
b/flink-staging/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.addons.hbase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables.
+ *
+ */
+public abstract class TableInputFormat<T extends Tuple> implements 
InputFormat<T, TableInputSplit>{
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(TableInputFormat.class);
+
+       /** helper variable to decide whether the input is exhausted or not */
+       private boolean endReached = false;
+
+       // TODO table and scan could be serialized when kryo serializer will be 
the default
+       protected transient HTable table;
+       protected transient Scan scan;
+
+       /** HBase iterator wrapper */
+       private ResultScanner rs;
+
+       private byte[] lastRow;
+       private int scannedRows;
+
+       // abstract methods allow for multiple table and scanners in the same 
job
+       protected abstract Scan getScanner();
+       protected abstract String getTableName();
+       protected abstract T mapResultToTuple(Result r);
+
+       /**
+        * creates a {@link Scan} object and a {@link HTable} connection
+        *
+        * @param parameters
+        * @see Configuration
+        */
+       @Override
+       public void configure(Configuration parameters) {
+               this.table = createTable();
+               this.scan = getScanner();
+       }
+
+       /** Create an {@link HTable} instance and set it into this format */
+       private HTable createTable() {
+               LOG.info("Initializing HBaseConfiguration");
+               //use files found in the classpath
+               org.apache.hadoop.conf.Configuration hConf = 
HBaseConfiguration.create();
+
+               try {
+                       return new HTable(hConf, getTableName());
+               } catch (Exception e) {
+                       LOG.error("Error instantiating a new HTable instance", 
e);
+               }
+               return null;
+       }
+
+       @Override
+       public boolean reachedEnd() throws IOException {
+               return this.endReached;
+       }
+
+       @Override
+       public T nextRecord(T reuse) throws IOException {
+               if (this.rs == null){
+                       throw new IOException("No table result scanner 
provided!");
+               }
+               try{
+                       Result res = this.rs.next();
+                       if (res != null){
+                               scannedRows++;
+                               lastRow = res.getRow();
+                               return mapResultToTuple(res);
+                       }
+               }catch (Exception e) {
+                       this.rs.close();
+                       //workaround for timeout on scan
+                       StringBuffer logMsg = new StringBuffer("Error after 
scan of ")
+                                       .append(scannedRows)
+                                       .append(" rows. Retry with a new 
scanner...");
+                       LOG.warn(logMsg.toString(), e);
+                       this.scan.setStartRow(lastRow);
+                       this.rs = table.getScanner(scan);
+                       Result res = this.rs.next();
+                       if (res != null) {
+                               scannedRows++;
+                               lastRow = res.getRow();
+                               return mapResultToTuple(res);
+                       }
+               }
+
+               this.endReached = true;
+               return null;
+       }
+
+       @Override
+       public void open(TableInputSplit split) throws IOException {
+               if (split == null){
+                       throw new IOException("Input split is null!");
+               }
+               if (table == null){
+                       throw new IOException("No HTable provided!");
+               }
+               if (scan == null){
+                       throw new IOException("No Scan instance provided");
+               }
+
+               logSplitInfo("opening", split);
+               scan.setStartRow(split.getStartRow());
+               lastRow = split.getEndRow();
+               scan.setStopRow(lastRow);
+
+               this.rs = table.getScanner(scan);
+               this.endReached = false;
+               this.scannedRows = 0;
+       }
+
+       @Override
+       public void close() throws IOException {
+               if(rs!=null){
+                       this.rs.close();
+               }
+               if(table!=null){
+                       this.table.close();
+               }
+               LOG.info("Closing split (scanned {} rows)", scannedRows);
+               this.lastRow = null;
+       }
+
+       @Override
+       public TableInputSplit[] createInputSplits(final int minNumSplits) 
throws IOException {
+               //Gets the starting and ending row keys for every region in the 
currently open table
+               final Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
+               if (keys == null || keys.getFirst() == null || 
keys.getFirst().length == 0) {
+                       throw new IOException("Expecting at least one region.");
+               }
+               final byte[] startRow = scan.getStartRow();
+               final byte[] stopRow = scan.getStopRow();
+               final boolean scanWithNoLowerBound = startRow.length == 0;
+               final boolean scanWithNoUpperBound = stopRow.length == 0;
+
+               final List<TableInputSplit> splits = new 
ArrayList<TableInputSplit>(minNumSplits);
+               for (int i = 0; i < keys.getFirst().length; i++) {
+                       final byte[] startKey = keys.getFirst()[i];
+                       final byte[] endKey = keys.getSecond()[i];
+                       final String regionLocation = 
table.getRegionLocation(startKey, false).getHostnamePort();
+                       //Test if the given region is to be included in the 
InputSplit while splitting the regions of a table
+                       if (!includeRegionInSplit(startKey, endKey)) {
+                               continue;
+                       }
+                       //Finds the region on which the given row is being 
served
+                       final String[] hosts = new String[] { regionLocation };
+
+                       // determine if regions contains keys used by the scan
+                       boolean isLastRegion = endKey.length == 0;
+                       if ((scanWithNoLowerBound || isLastRegion || 
Bytes.compareTo(startRow, endKey) < 0) &&
+                                       (scanWithNoUpperBound || 
Bytes.compareTo(stopRow, startKey) > 0)) {
+
+                               final byte[] splitStart = scanWithNoLowerBound 
|| Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow;
+                               final byte[] splitStop = (scanWithNoUpperBound 
|| Bytes.compareTo(endKey, stopRow) <= 0)
+                                               && !isLastRegion ? endKey : 
stopRow;
+                               int id = splits.size();
+                               final TableInputSplit split = new 
TableInputSplit(id, hosts, table.getTableName(), splitStart, splitStop);
+                               splits.add(split);
+                       }
+               }
+               LOG.info("Created " + splits.size() + " splits");
+               for (TableInputSplit split : splits) {
+                       logSplitInfo("created", split);
+               }
+               return splits.toArray(new TableInputSplit[0]);
+       }
+
+       private void logSplitInfo(String action, TableInputSplit split) {
+               int splitId = split.getSplitNumber();
+               String splitStart = Bytes.toString(split.getStartRow());
+               String splitEnd = Bytes.toString(split.getEndRow());
+               String splitStartKey = splitStart.isEmpty() ? "-" : splitStart;
+               String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd;
+               String[] hostnames = split.getHostnames();
+               LOG.info("{} split [{}|{}|{}|{}]",action, splitId, hostnames, 
splitStartKey, splitStopKey);
+       }
+
+       /**
+        * Test if the given region is to be included in the InputSplit while 
splitting the regions of a table.
+        * <p>
+        * This optimization is effective when there is a specific reasoning to 
exclude an entire region from the M-R job,
+        * (and hence, not contributing to the InputSplit), given the start and 
end keys of the same. <br>
+        * Useful when we need to remember the last-processed top record and 
revisit the [last, current) interval for M-R
+        * processing, continuously. In addition to reducing InputSplits, 
reduces the load on the region server as well, due
+        * to the ordering of the keys. <br>
+        * <br>
+        * Note: It is possible that <code>endKey.length() == 0 </code> , for 
the last (recent) region. <br>
+        * Override this method, if you want to bulk exclude regions altogether 
from M-R. By default, no region is excluded(
+        * i.e. all regions are included).
+        *
+        * @param startKey
+        *        Start key of the region
+        * @param endKey
+        *        End key of the region
+        * @return true, if this region needs to be included as part of the 
input (default).
+        */
+       private static boolean includeRegionInSplit(final byte[] startKey, 
final byte[] endKey) {
+               return true;
+       }
+
+       @Override
+       public InputSplitAssigner getInputSplitAssigner(TableInputSplit[] 
inputSplits) {
+               return new LocatableInputSplitAssigner(inputSplits);
+       }
+
+       @Override
+       public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
+               return null;
+       }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
 
b/flink-staging/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
new file mode 100644
index 0000000..6d8bf42
--- /dev/null
+++ 
b/flink-staging/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.LocatableInputSplit;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+/**
+ * This class implements a input splits for HBase. Each table input split 
corresponds to a key range (low, high). All
+ * references to row below refer to the key of the row.
+ */
+public class TableInputSplit extends LocatableInputSplit {
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * The name of the table to retrieve data from
+        */
+       private byte[] tableName;
+
+       /**
+        * The start row of the split.
+        */
+       private byte[] startRow;
+
+       /**
+        * The end row of the split.
+        */
+       private byte[] endRow;
+
+       /**
+        * Creates a new table input split
+        * 
+        * @param splitNumber
+        *        the number of the input split
+        * @param hostnames
+        *        the names of the hosts storing the data the input split 
refers to
+        * @param tableName
+        *        the name of the table to retrieve data from
+        * @param startRow
+        *        the start row of the split
+        * @param endRow
+        *        the end row of the split
+        */
+       TableInputSplit(final int splitNumber, final String[] hostnames, final 
byte[] tableName, final byte[] startRow,
+                       final byte[] endRow) {
+               super(splitNumber, hostnames);
+
+               this.tableName = tableName;
+               this.startRow = startRow;
+               this.endRow = endRow;
+       }
+
+       /**
+        * Default constructor for serialization/deserialization.
+        */
+       public TableInputSplit() {
+               super();
+
+               this.tableName = null;
+               this.startRow = null;
+               this.endRow = null;
+       }
+
+       /**
+        * Returns the table name.
+        * 
+        * @return The table name.
+        */
+       public byte[] getTableName() {
+               return this.tableName;
+       }
+
+       /**
+        * Returns the start row.
+        * 
+        * @return The start row.
+        */
+       public byte[] getStartRow() {
+               return this.startRow;
+       }
+
+       /**
+        * Returns the end row.
+        * 
+        * @return The end row.
+        */
+       public byte[] getEndRow() {
+               return this.endRow;
+       }
+
+
+       @Override
+       public void write(final DataOutputView out) throws IOException {
+
+               super.write(out);
+
+               // Write the table name
+               if (this.tableName == null) {
+                       out.writeInt(-1);
+               } else {
+                       out.writeInt(this.tableName.length);
+                       out.write(this.tableName);
+               }
+
+               // Write the start row
+               if (this.startRow == null) {
+                       out.writeInt(-1);
+               } else {
+                       out.writeInt(this.startRow.length);
+                       out.write(this.startRow);
+               }
+
+               // Write the end row
+               if (this.endRow == null) {
+                       out.writeInt(-1);
+               } else {
+                       out.writeInt(this.endRow.length);
+                       out.write(this.endRow);
+               }
+       }
+
+
+       @Override
+       public void read(final DataInputView in) throws IOException {
+
+               super.read(in);
+
+               // Read the table name
+               int len = in.readInt();
+               if (len >= 0) {
+                       this.tableName = new byte[len];
+                       in.readFully(this.tableName);
+               }
+
+               // Read the start row
+               len = in.readInt();
+               if (len >= 0) {
+                       this.startRow = new byte[len];
+                       in.readFully(this.startRow);
+               }
+
+               // Read the end row
+               len = in.readInt();
+               if (len >= 0) {
+                       this.endRow = new byte[len];
+                       in.readFully(this.endRow);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
 
b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
new file mode 100755
index 0000000..b6f345a
--- /dev/null
+++ 
b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.TableInputFormat;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Simple stub for HBase DataSet
+ * 
+ * To run the test first create the test table with hbase shell.
+ * 
+ * Use the following commands:
+ * <ul>
+ *     <li>create 'test-table', 'someCf'</li>
+ *     <li>put 'test-table', '1', 'someCf:someQual', 'someString'</li>
+ *     <li>put 'test-table', '2', 'someCf:someQual', 'anotherString'</li>
+ * </ul>
+ * 
+ * The test should return just the first entry.
+ * 
+ */
+public class HBaseReadExample {
+       public static void main(String[] args) throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               @SuppressWarnings("serial")
+               DataSet<Tuple2<String, String>> hbaseDs = env.createInput(new 
TableInputFormat<Tuple2<String, String>>() {
+                       private final byte[] CF_SOME = "someCf".getBytes();
+                       private final byte[] Q_SOME = "someQual".getBytes();
+                               @Override
+                               public String getTableName() {
+                                       return "test-table";
+                               }
+
+                               @Override
+                               protected Scan getScanner() {
+                                       Scan scan = new Scan();
+                                       scan.addColumn(CF_SOME, Q_SOME);
+                                       return scan;
+                               }
+
+                               private Tuple2<String, String> reuse = new 
Tuple2<String, String>();
+                               
+                               @Override
+                               protected Tuple2<String, String> 
mapResultToTuple(Result r) {
+                                       String key = Bytes.toString(r.getRow());
+                                       String val = 
Bytes.toString(r.getValue(CF_SOME, Q_SOME));
+                                       reuse.setField(key, 0);
+                                       reuse.setField(val, 1);
+                                       return reuse;
+                               }
+               })
+               .filter(new FilterFunction<Tuple2<String,String>>() {
+
+                       @Override
+                       public boolean filter(Tuple2<String, String> t) throws 
Exception {
+                               String val = t.getField(1);
+                               if(val.startsWith("someStr"))
+                                       return true;
+                               return false;
+                       }
+               });
+               
+               hbaseDs.print();
+               
+               // kick off execution.
+               env.execute();
+                               
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hbase/src/test/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hbase/src/test/resources/hbase-site.xml 
b/flink-staging/flink-hbase/src/test/resources/hbase-site.xml
new file mode 100644
index 0000000..2984063
--- /dev/null
+++ b/flink-staging/flink-hbase/src/test/resources/hbase-site.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+
+  <property>
+    <name>hbase.tmp.dir</name>
+    <!-- 
+    <value>/media/Dati/hbase-0.98-data</value>
+    --> 
+    <value>/opt/hbase-0.98.6.1-hadoop2/data</value>
+
+  </property>
+  <property>
+    <name>hbase.zookeeper.quorum</name>
+    <value>localhost</value>
+  </property>
+    <!-- 
+  <property>
+    <name>hadoop.security.group.mapping</name>
+    <value>org.apache.hadoop.security.ShellBasedUnixGroupsMapping</value>
+  </property>
+  -->
+</configuration>

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hbase/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hbase/src/test/resources/log4j.properties 
b/flink-staging/flink-hbase/src/test/resources/log4j.properties
new file mode 100755
index 0000000..d6eb2b2
--- /dev/null
+++ b/flink-staging/flink-hbase/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+# http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+log4j.rootLogger=${hadoop.root.logger}
+hadoop.root.logger=INFO,console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
%c{2}: %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-jdbc/pom.xml b/flink-staging/flink-jdbc/pom.xml
new file mode 100644
index 0000000..7182984
--- /dev/null
+++ b/flink-staging/flink-jdbc/pom.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+       
+       <modelVersion>4.0.0</modelVersion>
+       
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-staging</artifactId>
+               <version>0.9-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-jdbc</artifactId>
+       <name>flink-jdbc</name>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-java</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+                               
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-clients</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.derby</groupId>
+                       <artifactId>derby</artifactId>
+                       <version>10.10.1.1</version>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
 
b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
new file mode 100644
index 0000000..3cfaeb9
--- /dev/null
+++ 
b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.flink.api.common.io.NonParallelInput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.types.NullValue;
+
+/**
+ * InputFormat to read data from a database and generate tuples.
+ * The InputFormat has to be configured using the supplied InputFormatBuilder.
+ * 
+ * @param <OUT>
+ * @see Tuple
+ * @see DriverManager
+ */
+public class JDBCInputFormat<OUT extends Tuple> implements InputFormat<OUT, 
InputSplit>, NonParallelInput {
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(JDBCInputFormat.class);
+
+       private String username;
+       private String password;
+       private String drivername;
+       private String dbURL;
+       private String query;
+
+       private transient Connection dbConn;
+       private transient Statement statement;
+       private transient ResultSet resultSet;
+
+       private int[] columnTypes = null;
+
+       public JDBCInputFormat() {
+       }
+
+       @Override
+       public void configure(Configuration parameters) {
+       }
+
+       /**
+        * Connects to the source database and executes the query.
+        *
+        * @param ignored
+        * @throws IOException
+        */
+       @Override
+       public void open(InputSplit ignored) throws IOException {
+               try {
+                       establishConnection();
+                       statement = 
dbConn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, 
ResultSet.CONCUR_READ_ONLY);
+                       resultSet = statement.executeQuery(query);
+               } catch (SQLException se) {
+                       close();
+                       throw new IllegalArgumentException("open() failed." + 
se.getMessage(), se);
+               } catch (ClassNotFoundException cnfe) {
+                       throw new IllegalArgumentException("JDBC-Class not 
found. - " + cnfe.getMessage(), cnfe);
+               }
+       }
+
+       private void establishConnection() throws SQLException, 
ClassNotFoundException {
+               Class.forName(drivername);
+               if (username == null) {
+                       dbConn = DriverManager.getConnection(dbURL);
+               } else {
+                       dbConn = DriverManager.getConnection(dbURL, username, 
password);
+               }
+       }
+
+       /**
+        * Closes all resources used.
+        *
+        * @throws IOException Indicates that a resource could not be closed.
+        */
+       @Override
+       public void close() throws IOException {
+               try {
+                       resultSet.close();
+               } catch (SQLException se) {
+                       LOG.info("Inputformat couldn't be closed - " + 
se.getMessage());
+               } catch (NullPointerException npe) {
+               }
+               try {
+                       statement.close();
+               } catch (SQLException se) {
+                       LOG.info("Inputformat couldn't be closed - " + 
se.getMessage());
+               } catch (NullPointerException npe) {
+               }
+               try {
+                       dbConn.close();
+               } catch (SQLException se) {
+                       LOG.info("Inputformat couldn't be closed - " + 
se.getMessage());
+               } catch (NullPointerException npe) {
+               }
+       }
+
+       /**
+        * Checks whether all data has been read.
+        *
+        * @return boolean value indication whether all data has been read.
+        * @throws IOException
+        */
+       @Override
+       public boolean reachedEnd() throws IOException {
+               try {
+                       if (resultSet.isLast()) {
+                               close();
+                               return true;
+                       }
+                       return false;
+               } catch (SQLException se) {
+                       throw new IOException("Couldn't evaluate reachedEnd() - 
" + se.getMessage(), se);
+               }
+       }
+
+       /**
+        * Stores the next resultSet row in a tuple
+        *
+        * @param tuple
+        * @return tuple containing next row
+        * @throws java.io.IOException
+        */
+       @Override
+       public OUT nextRecord(OUT tuple) throws IOException {
+               try {
+                       resultSet.next();
+                       if (columnTypes == null) {
+                               extractTypes(tuple);
+                       }
+                       addValue(tuple);
+                       return tuple;
+               } catch (SQLException se) {
+                       close();
+                       throw new IOException("Couldn't read data - " + 
se.getMessage(), se);
+               } catch (NullPointerException npe) {
+                       close();
+                       throw new IOException("Couldn't access resultSet", npe);
+               }
+       }
+
+       private void extractTypes(OUT tuple) throws SQLException, IOException {
+               ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+               columnTypes = new int[resultSetMetaData.getColumnCount()];
+               if (tuple.getArity() != columnTypes.length) {
+                       close();
+                       throw new IOException("Tuple size does not match 
columncount");
+               }
+               for (int pos = 0; pos < columnTypes.length; pos++) {
+                       columnTypes[pos] = resultSetMetaData.getColumnType(pos 
+ 1);
+               }
+       }
+
+       /**
+        * Enters data value from the current resultSet into a Record.
+        *
+        * @param reuse Target Record.
+        */
+       private void addValue(OUT reuse) throws SQLException {
+               for (int pos = 0; pos < columnTypes.length; pos++) {
+                       switch (columnTypes[pos]) {
+                               case java.sql.Types.NULL:
+                                       reuse.setField(NullValue.getInstance(), 
pos);
+                                       break;
+                               case java.sql.Types.BOOLEAN:
+                                       reuse.setField(resultSet.getBoolean(pos 
+ 1), pos);
+                                       break;
+                               case java.sql.Types.BIT:
+                                       reuse.setField(resultSet.getBoolean(pos 
+ 1), pos);
+                                       break;
+                               case java.sql.Types.CHAR:
+                                       reuse.setField(resultSet.getString(pos 
+ 1), pos);
+                                       break;
+                               case java.sql.Types.NCHAR:
+                                       reuse.setField(resultSet.getString(pos 
+ 1), pos);
+                                       break;
+                               case java.sql.Types.VARCHAR:
+                                       reuse.setField(resultSet.getString(pos 
+ 1), pos);
+                                       break;
+                               case java.sql.Types.LONGVARCHAR:
+                                       reuse.setField(resultSet.getString(pos 
+ 1), pos);
+                                       break;
+                               case java.sql.Types.LONGNVARCHAR:
+                                       reuse.setField(resultSet.getString(pos 
+ 1), pos);
+                                       break;
+                               case java.sql.Types.TINYINT:
+                                       reuse.setField(resultSet.getShort(pos + 
1), pos);
+                                       break;
+                               case java.sql.Types.SMALLINT:
+                                       reuse.setField(resultSet.getShort(pos + 
1), pos);
+                                       break;
+                               case java.sql.Types.BIGINT:
+                                       reuse.setField(resultSet.getLong(pos + 
1), pos);
+                                       break;
+                               case java.sql.Types.INTEGER:
+                                       reuse.setField(resultSet.getInt(pos + 
1), pos);
+                                       break;
+                               case java.sql.Types.FLOAT:
+                                       reuse.setField(resultSet.getDouble(pos 
+ 1), pos);
+                                       break;
+                               case java.sql.Types.REAL:
+                                       reuse.setField(resultSet.getFloat(pos + 
1), pos);
+                                       break;
+                               case java.sql.Types.DOUBLE:
+                                       reuse.setField(resultSet.getDouble(pos 
+ 1), pos);
+                                       break;
+                               case java.sql.Types.DECIMAL:
+                                       
reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos);
+                                       break;
+                               case java.sql.Types.NUMERIC:
+                                       
reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos);
+                                       break;
+                               case java.sql.Types.DATE:
+                                       reuse.setField(resultSet.getDate(pos + 
1).toString(), pos);
+                                       break;
+                               case java.sql.Types.TIME:
+                                       reuse.setField(resultSet.getTime(pos + 
1).getTime(), pos);
+                                       break;
+                               case java.sql.Types.TIMESTAMP:
+                                       
reuse.setField(resultSet.getTimestamp(pos + 1).toString(), pos);
+                                       break;
+                               case java.sql.Types.SQLXML:
+                                       reuse.setField(resultSet.getSQLXML(pos 
+ 1).toString(), pos);
+                                       break;
+                               default:
+                                       throw new SQLException("Unsupported 
sql-type [" + columnTypes[pos] + "] on column [" + pos + "]");
+
+                               // case java.sql.Types.BINARY:
+                               // case java.sql.Types.VARBINARY:
+                               // case java.sql.Types.LONGVARBINARY:
+                               // case java.sql.Types.ARRAY:
+                               // case java.sql.Types.JAVA_OBJECT:
+                               // case java.sql.Types.BLOB:
+                               // case java.sql.Types.CLOB:
+                               // case java.sql.Types.NCLOB:
+                               // case java.sql.Types.DATALINK:
+                               // case java.sql.Types.DISTINCT:
+                               // case java.sql.Types.OTHER:
+                               // case java.sql.Types.REF:
+                               // case java.sql.Types.ROWID:
+                               // case java.sql.Types.STRUCT:
+                       }
+               }
+       }
+
+       @Override
+       public BaseStatistics getStatistics(BaseStatistics cachedStatistics) 
throws IOException {
+               return cachedStatistics;
+       }
+
+       @Override
+       public InputSplit[] createInputSplits(int minNumSplits) throws 
IOException {
+               GenericInputSplit[] split = {
+                       new GenericInputSplit(0, 1)
+               };
+               return split;
+       }
+
+       @Override
+       public InputSplitAssigner getInputSplitAssigner(InputSplit[] 
inputSplits) {
+               return new DefaultInputSplitAssigner(inputSplits);
+       }
+       
+
+       /**
+        * A builder used to set parameters to the output format's 
configuration in a fluent way.
+        * @return builder
+        */
+       public static JDBCInputFormatBuilder buildJDBCInputFormat() {
+               return new JDBCInputFormatBuilder();
+       }
+
+       public static class JDBCInputFormatBuilder {
+               private final JDBCInputFormat format;
+
+               public JDBCInputFormatBuilder() {
+                       this.format = new JDBCInputFormat();
+               }
+
+               public JDBCInputFormatBuilder setUsername(String username) {
+                       format.username = username;
+                       return this;
+               }
+
+               public JDBCInputFormatBuilder setPassword(String password) {
+                       format.password = password;
+                       return this;
+               }
+
+               public JDBCInputFormatBuilder setDrivername(String drivername) {
+                       format.drivername = drivername;
+                       return this;
+               }
+
+               public JDBCInputFormatBuilder setDBUrl(String dbURL) {
+                       format.dbURL = dbURL;
+                       return this;
+               }
+
+               public JDBCInputFormatBuilder setQuery(String query) {
+                       format.query = query;
+                       return this;
+               }
+
+               public JDBCInputFormat finish() {
+                       if (format.username == null) {
+                               LOG.info("Username was not supplied 
separately.");
+                       }
+                       if (format.password == null) {
+                               LOG.info("Password was not supplied 
separately.");
+                       }
+                       if (format.dbURL == null) {
+                               throw new IllegalArgumentException("No dababase 
URL supplied.");
+                       }
+                       if (format.query == null) {
+                               throw new IllegalArgumentException("No query 
suplied");
+                       }
+                       if (format.drivername == null) {
+                               throw new IllegalArgumentException("No driver 
supplied");
+                       }
+                       return format;
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 
b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
new file mode 100644
index 0000000..6771772
--- /dev/null
+++ 
b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * OutputFormat to write tuples into a database.
+ * The OutputFormat has to be configured using the supplied 
OutputFormatBuilder.
+ * 
+ * @param <OUT>
+ * @see Tuple
+ * @see DriverManager
+ */
+public class JDBCOutputFormat<OUT extends Tuple> implements OutputFormat<OUT> {
+       private static final long serialVersionUID = 1L;
+
+       @SuppressWarnings("unused")
+       private static final Logger LOG = 
LoggerFactory.getLogger(JDBCOutputFormat.class);
+
+       private String username;
+       private String password;
+       private String drivername;
+       private String dbURL;
+       private String query;
+       private int batchInterval = 5000;
+
+       private Connection dbConn;
+       private PreparedStatement upload;
+
+       private SupportedTypes[] types = null;
+
+       private int batchCount = 0;
+
+       public JDBCOutputFormat() {
+       }
+
+       @Override
+       public void configure(Configuration parameters) {
+       }
+
+       /**
+        * Connects to the target database and initializes the prepared 
statement.
+        *
+        * @param taskNumber The number of the parallel instance.
+        * @throws IOException Thrown, if the output could not be opened due to 
an
+        * I/O problem.
+        */
+       @Override
+       public void open(int taskNumber, int numTasks) throws IOException {
+               try {
+                       establishConnection();
+                       upload = dbConn.prepareStatement(query);
+               } catch (SQLException sqe) {
+                       close();
+                       throw new IllegalArgumentException("open() failed:\t!", 
sqe);
+               } catch (ClassNotFoundException cnfe) {
+                       close();
+                       throw new IllegalArgumentException("JDBC-Class not 
found:\t", cnfe);
+               }
+       }
+
+       private void establishConnection() throws SQLException, 
ClassNotFoundException {
+               Class.forName(drivername);
+               if (username == null) {
+                       dbConn = DriverManager.getConnection(dbURL);
+               } else {
+                       dbConn = DriverManager.getConnection(dbURL, username, 
password);
+               }
+       }
+
+       private enum SupportedTypes {
+               BOOLEAN,
+               BYTE,
+               SHORT,
+               INTEGER,
+               LONG,
+               STRING,
+               FLOAT,
+               DOUBLE
+       }
+
+       /**
+        * Adds a record to the prepared statement.
+        * <p>
+        * When this method is called, the output format is guaranteed to be 
opened.
+        *
+        * @param tuple The records to add to the output.
+        * @throws IOException Thrown, if the records could not be added due to 
an I/O problem.
+        */
+       @Override
+       public void writeRecord(OUT tuple) throws IOException {
+               try {
+                       if (types == null) {
+                               extractTypes(tuple);
+                       }
+                       addValues(tuple);
+                       upload.addBatch();
+                       batchCount++;
+                       if (batchCount >= batchInterval) {
+                               upload.executeBatch();
+                               batchCount = 0;
+                       }
+               } catch (SQLException sqe) {
+                       close();
+                       throw new IllegalArgumentException("writeRecord() 
failed", sqe);
+               } catch (IllegalArgumentException iae) {
+                       close();
+                       throw new IllegalArgumentException("writeRecord() 
failed", iae);
+               }
+       }
+
+       private void extractTypes(OUT tuple) {
+               types = new SupportedTypes[tuple.getArity()];
+               for (int x = 0; x < tuple.getArity(); x++) {
+                       types[x] = 
SupportedTypes.valueOf(tuple.getField(x).getClass().getSimpleName().toUpperCase());
+               }
+       }
+
+       private void addValues(OUT tuple) throws SQLException {
+               for (int index = 0; index < tuple.getArity(); index++) {
+                       switch (types[index]) {
+                               case BOOLEAN:
+                                       upload.setBoolean(index + 1, (Boolean) 
tuple.getField(index));
+                                       break;
+                               case BYTE:
+                                       upload.setByte(index + 1, (Byte) 
tuple.getField(index));
+                                       break;
+                               case SHORT:
+                                       upload.setShort(index + 1, (Short) 
tuple.getField(index));
+                                       break;
+                               case INTEGER:
+                                       upload.setInt(index + 1, (Integer) 
tuple.getField(index));
+                                       break;
+                               case LONG:
+                                       upload.setLong(index + 1, (Long) 
tuple.getField(index));
+                                       break;
+                               case STRING:
+                                       upload.setString(index + 1, (String) 
tuple.getField(index));
+                                       break;
+                               case FLOAT:
+                                       upload.setFloat(index + 1, (Float) 
tuple.getField(index));
+                                       break;
+                               case DOUBLE:
+                                       upload.setDouble(index + 1, (Double) 
tuple.getField(index));
+                                       break;
+                       }
+               }
+       }
+
+       /**
+        * Executes prepared statement and closes all resources of this 
instance.
+        *
+        * @throws IOException Thrown, if the input could not be closed 
properly.
+        */
+       @Override
+       public void close() throws IOException {
+               try {
+                       upload.executeBatch();
+                       batchCount = 0;
+               } catch (SQLException se) {
+                       throw new IllegalArgumentException("close() failed", 
se);
+               } catch (NullPointerException se) {
+               }
+               try {
+                       upload.close();
+               } catch (SQLException se) {
+                       LOG.info("Inputformat couldn't be closed - " + 
se.getMessage());
+               } catch (NullPointerException npe) {
+               }
+               try {
+                       dbConn.close();
+               } catch (SQLException se) {
+                       LOG.info("Inputformat couldn't be closed - " + 
se.getMessage());
+               } catch (NullPointerException npe) {
+               }
+       }
+
+       public static JDBCOutputFormatBuilder buildJDBCOutputFormat() {
+               return new JDBCOutputFormatBuilder();
+       }
+
+       public static class JDBCOutputFormatBuilder {
+               private final JDBCOutputFormat format;
+
+               protected JDBCOutputFormatBuilder() {
+                       this.format = new JDBCOutputFormat();
+               }
+
+               public JDBCOutputFormatBuilder setUsername(String username) {
+                       format.username = username;
+                       return this;
+               }
+
+               public JDBCOutputFormatBuilder setPassword(String password) {
+                       format.password = password;
+                       return this;
+               }
+
+               public JDBCOutputFormatBuilder setDrivername(String drivername) 
{
+                       format.drivername = drivername;
+                       return this;
+               }
+
+               public JDBCOutputFormatBuilder setDBUrl(String dbURL) {
+                       format.dbURL = dbURL;
+                       return this;
+               }
+
+               public JDBCOutputFormatBuilder setQuery(String query) {
+                       format.query = query;
+                       return this;
+               }
+
+               public JDBCOutputFormatBuilder setBatchInterval(int 
batchInterval) {
+                       format.batchInterval = batchInterval;
+                       return this;
+               }
+
+               /**
+               Finalizes the configuration and checks validity.
+               @return Configured JDBCOutputFormat
+                */
+               public JDBCOutputFormat finish() {
+                       if (format.username == null) {
+                               LOG.info("Username was not supplied 
separately.");
+                       }
+                       if (format.password == null) {
+                               LOG.info("Password was not supplied 
separately.");
+                       }
+                       if (format.dbURL == null) {
+                               throw new IllegalArgumentException("No dababase 
URL supplied.");
+                       }
+                       if (format.query == null) {
+                               throw new IllegalArgumentException("No query 
suplied");
+                       }
+                       if (format.drivername == null) {
+                               throw new IllegalArgumentException("No driver 
supplied");
+                       }
+                       return format;
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
 
b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
new file mode 100644
index 0000000..7b012ba
--- /dev/null
+++ 
b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.example;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
+import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+public class JDBCExample {
+
+       public static void main(String[] args) throws Exception {
+               prepareTestDb();
+
+               ExecutionEnvironment environment = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple5> source
+                               = 
environment.createInput(JDBCInputFormat.buildJDBCInputFormat()
+                                               
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+                                               
.setDBUrl("jdbc:derby:memory:ebookshop")
+                                               .setQuery("select * from books")
+                                               .finish(),
+                                               new TupleTypeInfo(Tuple5.class, 
INT_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO, DOUBLE_TYPE_INFO, 
INT_TYPE_INFO)
+                               );
+
+               source.output(JDBCOutputFormat.buildJDBCOutputFormat()
+                               
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+                               .setDBUrl("jdbc:derby:memory:ebookshop")
+                               .setQuery("insert into newbooks 
(id,title,author,price,qty) values (?,?,?,?,?)")
+                               .finish());
+               environment.execute();
+       }
+
+       private static void prepareTestDb() throws Exception {
+               String dbURL = "jdbc:derby:memory:ebookshop;create=true";
+               Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+               Connection conn = DriverManager.getConnection(dbURL);
+
+               StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE 
books (");
+               sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
+               sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
+               sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
+               sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
+               sqlQueryBuilder.append("qty INT DEFAULT NULL,");
+               sqlQueryBuilder.append("PRIMARY KEY (id))");
+
+               Statement stat = conn.createStatement();
+               stat.executeUpdate(sqlQueryBuilder.toString());
+               stat.close();
+
+               sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks (");
+               sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
+               sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
+               sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
+               sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
+               sqlQueryBuilder.append("qty INT DEFAULT NULL,");
+               sqlQueryBuilder.append("PRIMARY KEY (id))");
+
+               stat = conn.createStatement();
+               stat.executeUpdate(sqlQueryBuilder.toString());
+               stat.close();
+
+               sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, 
title, author, price, qty) VALUES ");
+               sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah 
Teck', 11.11, 11),");
+               sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah 
Teck', 22.22, 22),");
+               sqlQueryBuilder.append("(1003, 'More Java for more dummies', 
'Mohammad Ali', 33.33, 33),");
+               sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 
44),");
+               sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin 
Jones', 55.55, 55)");
+
+               stat = conn.createStatement();
+               stat.execute(sqlQueryBuilder.toString());
+               stat.close();
+
+               conn.close();
+       }
+}

Reply via email to