http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java new file mode 100644 index 0000000..1801c3e --- /dev/null +++ b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoopcompatibility.mapred; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class HadoopReduceFunctionITCase extends MultipleProgramsTestBase { + + public HadoopReduceFunctionITCase(ExecutionMode mode){ + super(mode); + } + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testStandardGrouping() throws Exception{ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env). + map(new Mapper1()); + + DataSet<Tuple2<IntWritable, IntWritable>> commentCnts = ds. + groupBy(0). + reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(new CommentCntReducer())); + + String resultPath = tempFolder.newFile().toURI().toString(); + + commentCnts.writeAsText(resultPath); + env.execute(); + + String expected = "(0,0)\n"+ + "(1,3)\n" + + "(2,5)\n" + + "(3,5)\n" + + "(4,2)\n"; + + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testUngroupedHadoopReducer() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env); + + DataSet<Tuple2<IntWritable, IntWritable>> commentCnts = ds. + reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(new AllCommentCntReducer())); + + String resultPath = tempFolder.newFile().toURI().toString(); + + commentCnts.writeAsText(resultPath); + env.execute(); + + String expected = "(42,15)\n"; + + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testConfigurationViaJobConf() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + JobConf conf = new JobConf(); + conf.set("my.cntPrefix", "Hello"); + + DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env). + map(new Mapper2()); + + DataSet<Tuple2<IntWritable, IntWritable>> helloCnts = ds. + groupBy(0). + reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>( + new ConfigurableCntReducer(), conf)); + + String resultPath = tempFolder.newFile().toURI().toString(); + + helloCnts.writeAsText(resultPath); + env.execute(); + + String expected = "(0,0)\n"+ + "(1,0)\n" + + "(2,1)\n" + + "(3,1)\n" + + "(4,1)\n"; + + compareResultsByLinesInMemory(expected, resultPath); + } + + public static class CommentCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> { + + @Override + public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r) + throws IOException { + int commentCnt = 0; + while(vs.hasNext()) { + String v = vs.next().toString(); + if(v.startsWith("Comment")) { + commentCnt++; + } + } + out.collect(k, new IntWritable(commentCnt)); + } + + @Override + public void configure(final JobConf arg0) { } + + @Override + public void close() throws IOException { } + } + + public static class AllCommentCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> { + + @Override + public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r) + throws IOException { + int commentCnt = 0; + while(vs.hasNext()) { + String v = vs.next().toString(); + if(v.startsWith("Comment")) { + commentCnt++; + } + } + out.collect(new IntWritable(42), new IntWritable(commentCnt)); + } + + @Override + public void configure(final JobConf arg0) { } + + @Override + public void close() throws IOException { } + } + + public static class ConfigurableCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> { + private String countPrefix; + + @Override + public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r) + throws IOException { + int commentCnt = 0; + while(vs.hasNext()) { + String v = vs.next().toString(); + if(v.startsWith(this.countPrefix)) { + commentCnt++; + } + } + out.collect(k, new IntWritable(commentCnt)); + } + + @Override + public void configure(final JobConf c) { + this.countPrefix = c.get("my.cntPrefix"); + } + + @Override + public void close() throws IOException { } + } + + public static class Mapper1 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>> { + private static final long serialVersionUID = 1L; + @Override + public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v) + throws Exception { + v.f0 = new IntWritable(v.f0.get() / 5); + return v; + } + } + + public static class Mapper2 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>> { + private static final long serialVersionUID = 1L; + @Override + public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v) + throws Exception { + v.f0 = new IntWritable(v.f0.get() % 5); + return v; + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java new file mode 100644 index 0000000..eed6f8f --- /dev/null +++ b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoopcompatibility.mapred; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; + +public class HadoopTestData { + + public static DataSet<Tuple2<IntWritable, Text>> getKVPairDataSet(ExecutionEnvironment env) { + + List<Tuple2<IntWritable, Text>> data = new ArrayList<Tuple2<IntWritable, Text>>(); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(1),new Text("Hi"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(2),new Text("Hello"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(3),new Text("Hello world"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(4),new Text("Hello world, how are you?"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(5),new Text("I am fine."))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(6),new Text("Luke Skywalker"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(7),new Text("Comment#1"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(8),new Text("Comment#2"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(9),new Text("Comment#3"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(10),new Text("Comment#4"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(11),new Text("Comment#5"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(12),new Text("Comment#6"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(13),new Text("Comment#7"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(14),new Text("Comment#8"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(15),new Text("Comment#9"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(16),new Text("Comment#10"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(17),new Text("Comment#11"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(18),new Text("Comment#12"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(19),new Text("Comment#13"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(20),new Text("Comment#14"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(21),new Text("Comment#15"))); + + Collections.shuffle(data); + + return env.fromCollection(data); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java new file mode 100644 index 0000000..fe7ea8e --- /dev/null +++ b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.test.hadoopcompatibility.mapred.record; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.hadoopcompatibility.mapred.record.example.WordCountWithOutputFormat; +import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.RecordAPITestBase; + +/** + * test the hadoop inputformat and outputformat + */ +public class HadoopRecordInputOutputITCase extends RecordAPITestBase { + protected String textPath; + protected String resultPath; + protected String counts; + + @Override + protected void preSubmit() throws Exception { + textPath = createTempFile("text.txt", WordCountData.TEXT); + resultPath = getTempDirPath("result"); + counts = WordCountData.COUNTS.replaceAll(" ", "\t"); + } + + @Override + protected Plan getTestJob() { + //WordCountWithHadoopOutputFormat takes hadoop TextInputFormat as input and output file in hadoop TextOutputFormat + WordCountWithOutputFormat wc = new WordCountWithOutputFormat(); + return wc.getPlan("1", textPath, resultPath); + } + + @Override + protected void postSubmit() throws Exception { + // Test results, append /1 to resultPath due to the generated _temproray file. + compareResultsByLinesInMemory(counts, resultPath + "/1"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java new file mode 100644 index 0000000..2592b88 --- /dev/null +++ b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoopcompatibility.mapred.wrapper; + +import java.util.ArrayList; +import java.util.NoSuchElementException; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator; +import org.apache.hadoop.io.IntWritable; +import org.junit.Assert; +import org.junit.Test; + +public class HadoopTupleUnwrappingIteratorTest { + + @Test + public void testValueIterator() { + + HadoopTupleUnwrappingIterator<IntWritable, IntWritable> valIt = + new HadoopTupleUnwrappingIterator<IntWritable, IntWritable>(IntWritable.class); + + // many values + + ArrayList<Tuple2<IntWritable, IntWritable>> tList = new ArrayList<Tuple2<IntWritable, IntWritable>>(); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(1))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(2))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(3))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(4))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(5))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(6))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(7))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(8))); + + int expectedKey = 1; + int[] expectedValues = new int[] {1,2,3,4,5,6,7,8}; + + valIt.set(tList.iterator()); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + for(int expectedValue : expectedValues) { + Assert.assertTrue(valIt.hasNext()); + Assert.assertTrue(valIt.hasNext()); + Assert.assertTrue(valIt.next().get() == expectedValue); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + } + Assert.assertFalse(valIt.hasNext()); + Assert.assertFalse(valIt.hasNext()); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + + // one value + + tList.clear(); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(2),new IntWritable(10))); + + expectedKey = 2; + expectedValues = new int[]{10}; + + valIt.set(tList.iterator()); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + for(int expectedValue : expectedValues) { + Assert.assertTrue(valIt.hasNext()); + Assert.assertTrue(valIt.hasNext()); + Assert.assertTrue(valIt.next().get() == expectedValue); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + } + Assert.assertFalse(valIt.hasNext()); + Assert.assertFalse(valIt.hasNext()); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + + // more values + + tList.clear(); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(10))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(4))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(7))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(9))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(21))); + + expectedKey = 3; + expectedValues = new int[]{10,4,7,9,21}; + + valIt.set(tList.iterator()); + Assert.assertTrue(valIt.hasNext()); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + for(int expectedValue : expectedValues) { + Assert.assertTrue(valIt.hasNext()); + Assert.assertTrue(valIt.hasNext()); + Assert.assertTrue(valIt.next().get() == expectedValue); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + } + Assert.assertFalse(valIt.hasNext()); + Assert.assertFalse(valIt.hasNext()); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + + // no has next calls + + tList.clear(); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(5))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(8))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(42))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(-1))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(0))); + + expectedKey = 4; + expectedValues = new int[]{5,8,42,-1,0}; + + valIt.set(tList.iterator()); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + for(int expectedValue : expectedValues) { + Assert.assertTrue(valIt.next().get() == expectedValue); + } + try { + valIt.next(); + Assert.fail(); + } catch (NoSuchElementException nsee) { + // expected + } + Assert.assertFalse(valIt.hasNext()); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java new file mode 100644 index 0000000..d79afaa --- /dev/null +++ b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.test.hadoopcompatibility.mapreduce; + + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.fail; + + + +public class HadoopInputFormatTest { + + + public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, T> { + + public DummyVoidKeyInputFormat() { + } + + @Override + public RecordReader<Void, T> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + return null; + } + } + + + @Test + public void checkTypeInformation() { + try { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // Set up the Hadoop Input Format + Job job = Job.getInstance(); + HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<Void, Long>( new DummyVoidKeyInputFormat(), Void.class, Long.class, job); + + TypeInformation<Tuple2<Void,Long>> tupleType = hadoopInputFormat.getProducedType(); + TypeInformation<Tuple2<Void,Long>> testTupleType = new TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO); + + if(tupleType.isTupleType()) { + if(!((TupleTypeInfo)tupleType).equals(testTupleType)) { + fail("Tuple type information was not set correctly!"); + } + } else { + fail("Type information was not set to tuple type information!"); + } + + } + catch (Exception ex) { + fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); + } + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java new file mode 100644 index 0000000..7eee629 --- /dev/null +++ b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoopcompatibility.mapreduce; + +import org.apache.flink.hadoopcompatibility.mapreduce.example.WordCount; +import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.JavaProgramTestBase; + +public class HadoopInputOutputITCase extends JavaProgramTestBase { + + protected String textPath; + protected String resultPath; + + + @Override + protected void preSubmit() throws Exception { + textPath = createTempFile("text.txt", WordCountData.TEXT); + resultPath = getTempDirPath("result"); + this.setDegreeOfParallelism(4); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[]{".", "_"}); + } + + @Override + protected void testProgram() throws Exception { + WordCount.main(new String[] { textPath, resultPath }); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/test/resources/log4j-test.properties b/flink-staging/flink-hadoop-compatibility/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..0b686e5 --- /dev/null +++ b/flink-staging/flink-hadoop-compatibility/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=OFF, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/test/resources/logback-test.xml b/flink-staging/flink-hadoop-compatibility/src/test/resources/logback-test.xml new file mode 100644 index 0000000..8b3bb27 --- /dev/null +++ b/flink-staging/flink-hadoop-compatibility/src/test/resources/logback-test.xml @@ -0,0 +1,29 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hbase/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hbase/pom.xml b/flink-staging/flink-hbase/pom.xml new file mode 100644 index 0000000..223389c --- /dev/null +++ b/flink-staging/flink-hbase/pom.xml @@ -0,0 +1,165 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-staging</artifactId> + <version>0.9-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-hbase</artifactId> + <name>flink-hbase</name> + <packaging>jar</packaging> + + <properties> + <hbase.hadoop1.version>0.98.6.1-hadoop1</hbase.hadoop1.version> + <hbase.hadoop2.version>0.98.6.1-hadoop2</hbase.hadoop2.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-hadoop-compatibility</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <version>${hbase.version}</version> + <exclusions> + <!-- Remove unneeded dependency, which is conflicting with our jetty-util version. --> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>hadoop-1</id> + <activation> + <property> + <!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh --> + <!--hadoop1--><name>hadoop.profile</name><value>1</value> + </property> + </activation> + <properties> + <hbase.version>${hbase.hadoop1.version}</hbase.version> + </properties> + <dependencies> + <!-- Force hadoop-common dependency --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + </dependency> + </dependencies> + </profile> + + <profile> + <id>hadoop-2</id> + <activation> + <property> + <!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh --> + <!--hadoop2--><name>!hadoop.profile</name> + </property> + </activation> + <properties> + <hbase.version>${hbase.hadoop2.version}</hbase.version> + </properties> + <dependencies> + <!-- Force hadoop-common dependency --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + </dependencies> + </profile> + + <profile> + <id>cdh5.1.3</id> + <properties> + <hadoop.profile>2</hadoop.profile> + <hbase.version>0.98.1-cdh5.1.3</hbase.version> + <hadoop.version>2.3.0-cdh5.1.3</hadoop.version> + <!-- Cloudera use different versions for hadoop core and commons--> + <!-- This profile could be removed if Cloudera fix this mismatch! --> + <hadoop.core.version>2.3.0-mr1-cdh5.1.3</hadoop.core.version> + </properties> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + <version>${hadoop.core.version}</version> + </dependency> + </dependencies> + </dependencyManagement> + <dependencies> + <!-- Force hadoop-common dependency --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + </dependency> + </dependencies> + </profile> + + </profiles> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-staging/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java new file mode 100755 index 0000000..9c861ed --- /dev/null +++ b/flink-staging/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.addons.hbase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.LocatableInputSplitAssigner; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link InputFormat} subclass that wraps the access for HTables. + * + */ +public abstract class TableInputFormat<T extends Tuple> implements InputFormat<T, TableInputSplit>{ + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(TableInputFormat.class); + + /** helper variable to decide whether the input is exhausted or not */ + private boolean endReached = false; + + // TODO table and scan could be serialized when kryo serializer will be the default + protected transient HTable table; + protected transient Scan scan; + + /** HBase iterator wrapper */ + private ResultScanner rs; + + private byte[] lastRow; + private int scannedRows; + + // abstract methods allow for multiple table and scanners in the same job + protected abstract Scan getScanner(); + protected abstract String getTableName(); + protected abstract T mapResultToTuple(Result r); + + /** + * creates a {@link Scan} object and a {@link HTable} connection + * + * @param parameters + * @see Configuration + */ + @Override + public void configure(Configuration parameters) { + this.table = createTable(); + this.scan = getScanner(); + } + + /** Create an {@link HTable} instance and set it into this format */ + private HTable createTable() { + LOG.info("Initializing HBaseConfiguration"); + //use files found in the classpath + org.apache.hadoop.conf.Configuration hConf = HBaseConfiguration.create(); + + try { + return new HTable(hConf, getTableName()); + } catch (Exception e) { + LOG.error("Error instantiating a new HTable instance", e); + } + return null; + } + + @Override + public boolean reachedEnd() throws IOException { + return this.endReached; + } + + @Override + public T nextRecord(T reuse) throws IOException { + if (this.rs == null){ + throw new IOException("No table result scanner provided!"); + } + try{ + Result res = this.rs.next(); + if (res != null){ + scannedRows++; + lastRow = res.getRow(); + return mapResultToTuple(res); + } + }catch (Exception e) { + this.rs.close(); + //workaround for timeout on scan + StringBuffer logMsg = new StringBuffer("Error after scan of ") + .append(scannedRows) + .append(" rows. Retry with a new scanner..."); + LOG.warn(logMsg.toString(), e); + this.scan.setStartRow(lastRow); + this.rs = table.getScanner(scan); + Result res = this.rs.next(); + if (res != null) { + scannedRows++; + lastRow = res.getRow(); + return mapResultToTuple(res); + } + } + + this.endReached = true; + return null; + } + + @Override + public void open(TableInputSplit split) throws IOException { + if (split == null){ + throw new IOException("Input split is null!"); + } + if (table == null){ + throw new IOException("No HTable provided!"); + } + if (scan == null){ + throw new IOException("No Scan instance provided"); + } + + logSplitInfo("opening", split); + scan.setStartRow(split.getStartRow()); + lastRow = split.getEndRow(); + scan.setStopRow(lastRow); + + this.rs = table.getScanner(scan); + this.endReached = false; + this.scannedRows = 0; + } + + @Override + public void close() throws IOException { + if(rs!=null){ + this.rs.close(); + } + if(table!=null){ + this.table.close(); + } + LOG.info("Closing split (scanned {} rows)", scannedRows); + this.lastRow = null; + } + + @Override + public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException { + //Gets the starting and ending row keys for every region in the currently open table + final Pair<byte[][], byte[][]> keys = table.getStartEndKeys(); + if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { + throw new IOException("Expecting at least one region."); + } + final byte[] startRow = scan.getStartRow(); + final byte[] stopRow = scan.getStopRow(); + final boolean scanWithNoLowerBound = startRow.length == 0; + final boolean scanWithNoUpperBound = stopRow.length == 0; + + final List<TableInputSplit> splits = new ArrayList<TableInputSplit>(minNumSplits); + for (int i = 0; i < keys.getFirst().length; i++) { + final byte[] startKey = keys.getFirst()[i]; + final byte[] endKey = keys.getSecond()[i]; + final String regionLocation = table.getRegionLocation(startKey, false).getHostnamePort(); + //Test if the given region is to be included in the InputSplit while splitting the regions of a table + if (!includeRegionInSplit(startKey, endKey)) { + continue; + } + //Finds the region on which the given row is being served + final String[] hosts = new String[] { regionLocation }; + + // determine if regions contains keys used by the scan + boolean isLastRegion = endKey.length == 0; + if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) && + (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) { + + final byte[] splitStart = scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow; + final byte[] splitStop = (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0) + && !isLastRegion ? endKey : stopRow; + int id = splits.size(); + final TableInputSplit split = new TableInputSplit(id, hosts, table.getTableName(), splitStart, splitStop); + splits.add(split); + } + } + LOG.info("Created " + splits.size() + " splits"); + for (TableInputSplit split : splits) { + logSplitInfo("created", split); + } + return splits.toArray(new TableInputSplit[0]); + } + + private void logSplitInfo(String action, TableInputSplit split) { + int splitId = split.getSplitNumber(); + String splitStart = Bytes.toString(split.getStartRow()); + String splitEnd = Bytes.toString(split.getEndRow()); + String splitStartKey = splitStart.isEmpty() ? "-" : splitStart; + String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd; + String[] hostnames = split.getHostnames(); + LOG.info("{} split [{}|{}|{}|{}]",action, splitId, hostnames, splitStartKey, splitStopKey); + } + + /** + * Test if the given region is to be included in the InputSplit while splitting the regions of a table. + * <p> + * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job, + * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br> + * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R + * processing, continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due + * to the ordering of the keys. <br> + * <br> + * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region. <br> + * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded( + * i.e. all regions are included). + * + * @param startKey + * Start key of the region + * @param endKey + * End key of the region + * @return true, if this region needs to be included as part of the input (default). + */ + private static boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) { + return true; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(TableInputSplit[] inputSplits) { + return new LocatableInputSplitAssigner(inputSplits); + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { + return null; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java b/flink-staging/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java new file mode 100644 index 0000000..6d8bf42 --- /dev/null +++ b/flink-staging/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import java.io.IOException; + +import org.apache.flink.core.io.LocatableInputSplit; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +/** + * This class implements a input splits for HBase. Each table input split corresponds to a key range (low, high). All + * references to row below refer to the key of the row. + */ +public class TableInputSplit extends LocatableInputSplit { + + private static final long serialVersionUID = 1L; + + /** + * The name of the table to retrieve data from + */ + private byte[] tableName; + + /** + * The start row of the split. + */ + private byte[] startRow; + + /** + * The end row of the split. + */ + private byte[] endRow; + + /** + * Creates a new table input split + * + * @param splitNumber + * the number of the input split + * @param hostnames + * the names of the hosts storing the data the input split refers to + * @param tableName + * the name of the table to retrieve data from + * @param startRow + * the start row of the split + * @param endRow + * the end row of the split + */ + TableInputSplit(final int splitNumber, final String[] hostnames, final byte[] tableName, final byte[] startRow, + final byte[] endRow) { + super(splitNumber, hostnames); + + this.tableName = tableName; + this.startRow = startRow; + this.endRow = endRow; + } + + /** + * Default constructor for serialization/deserialization. + */ + public TableInputSplit() { + super(); + + this.tableName = null; + this.startRow = null; + this.endRow = null; + } + + /** + * Returns the table name. + * + * @return The table name. + */ + public byte[] getTableName() { + return this.tableName; + } + + /** + * Returns the start row. + * + * @return The start row. + */ + public byte[] getStartRow() { + return this.startRow; + } + + /** + * Returns the end row. + * + * @return The end row. + */ + public byte[] getEndRow() { + return this.endRow; + } + + + @Override + public void write(final DataOutputView out) throws IOException { + + super.write(out); + + // Write the table name + if (this.tableName == null) { + out.writeInt(-1); + } else { + out.writeInt(this.tableName.length); + out.write(this.tableName); + } + + // Write the start row + if (this.startRow == null) { + out.writeInt(-1); + } else { + out.writeInt(this.startRow.length); + out.write(this.startRow); + } + + // Write the end row + if (this.endRow == null) { + out.writeInt(-1); + } else { + out.writeInt(this.endRow.length); + out.write(this.endRow); + } + } + + + @Override + public void read(final DataInputView in) throws IOException { + + super.read(in); + + // Read the table name + int len = in.readInt(); + if (len >= 0) { + this.tableName = new byte[len]; + in.readFully(this.tableName); + } + + // Read the start row + len = in.readInt(); + if (len >= 0) { + this.startRow = new byte[len]; + in.readFully(this.startRow); + } + + // Read the end row + len = in.readInt(); + if (len >= 0) { + this.endRow = new byte[len]; + in.readFully(this.endRow); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java new file mode 100755 index 0000000..b6f345a --- /dev/null +++ b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.addons.hbase.TableInputFormat; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Simple stub for HBase DataSet + * + * To run the test first create the test table with hbase shell. + * + * Use the following commands: + * <ul> + * <li>create 'test-table', 'someCf'</li> + * <li>put 'test-table', '1', 'someCf:someQual', 'someString'</li> + * <li>put 'test-table', '2', 'someCf:someQual', 'anotherString'</li> + * </ul> + * + * The test should return just the first entry. + * + */ +public class HBaseReadExample { + public static void main(String[] args) throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + @SuppressWarnings("serial") + DataSet<Tuple2<String, String>> hbaseDs = env.createInput(new TableInputFormat<Tuple2<String, String>>() { + private final byte[] CF_SOME = "someCf".getBytes(); + private final byte[] Q_SOME = "someQual".getBytes(); + @Override + public String getTableName() { + return "test-table"; + } + + @Override + protected Scan getScanner() { + Scan scan = new Scan(); + scan.addColumn(CF_SOME, Q_SOME); + return scan; + } + + private Tuple2<String, String> reuse = new Tuple2<String, String>(); + + @Override + protected Tuple2<String, String> mapResultToTuple(Result r) { + String key = Bytes.toString(r.getRow()); + String val = Bytes.toString(r.getValue(CF_SOME, Q_SOME)); + reuse.setField(key, 0); + reuse.setField(val, 1); + return reuse; + } + }) + .filter(new FilterFunction<Tuple2<String,String>>() { + + @Override + public boolean filter(Tuple2<String, String> t) throws Exception { + String val = t.getField(1); + if(val.startsWith("someStr")) + return true; + return false; + } + }); + + hbaseDs.print(); + + // kick off execution. + env.execute(); + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hbase/src/test/resources/hbase-site.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hbase/src/test/resources/hbase-site.xml b/flink-staging/flink-hbase/src/test/resources/hbase-site.xml new file mode 100644 index 0000000..2984063 --- /dev/null +++ b/flink-staging/flink-hbase/src/test/resources/hbase-site.xml @@ -0,0 +1,43 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +--> +<configuration> + + <property> + <name>hbase.tmp.dir</name> + <!-- + <value>/media/Dati/hbase-0.98-data</value> + --> + <value>/opt/hbase-0.98.6.1-hadoop2/data</value> + + </property> + <property> + <name>hbase.zookeeper.quorum</name> + <value>localhost</value> + </property> + <!-- + <property> + <name>hadoop.security.group.mapping</name> + <value>org.apache.hadoop.security.ShellBasedUnixGroupsMapping</value> + </property> + --> +</configuration> http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hbase/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hbase/src/test/resources/log4j.properties b/flink-staging/flink-hbase/src/test/resources/log4j.properties new file mode 100755 index 0000000..d6eb2b2 --- /dev/null +++ b/flink-staging/flink-hbase/src/test/resources/log4j.properties @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +log4j.rootLogger=${hadoop.root.logger} +hadoop.root.logger=INFO,console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-jdbc/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-jdbc/pom.xml b/flink-staging/flink-jdbc/pom.xml new file mode 100644 index 0000000..7182984 --- /dev/null +++ b/flink-staging/flink-jdbc/pom.xml @@ -0,0 +1,64 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-staging</artifactId> + <version>0.9-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-jdbc</artifactId> + <name>flink-jdbc</name> + + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.derby</groupId> + <artifactId>derby</artifactId> + <version>10.10.1.1</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java new file mode 100644 index 0000000..3cfaeb9 --- /dev/null +++ b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; + +import org.apache.flink.api.common.io.NonParallelInput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.types.NullValue; + +/** + * InputFormat to read data from a database and generate tuples. + * The InputFormat has to be configured using the supplied InputFormatBuilder. + * + * @param <OUT> + * @see Tuple + * @see DriverManager + */ +public class JDBCInputFormat<OUT extends Tuple> implements InputFormat<OUT, InputSplit>, NonParallelInput { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class); + + private String username; + private String password; + private String drivername; + private String dbURL; + private String query; + + private transient Connection dbConn; + private transient Statement statement; + private transient ResultSet resultSet; + + private int[] columnTypes = null; + + public JDBCInputFormat() { + } + + @Override + public void configure(Configuration parameters) { + } + + /** + * Connects to the source database and executes the query. + * + * @param ignored + * @throws IOException + */ + @Override + public void open(InputSplit ignored) throws IOException { + try { + establishConnection(); + statement = dbConn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY); + resultSet = statement.executeQuery(query); + } catch (SQLException se) { + close(); + throw new IllegalArgumentException("open() failed." + se.getMessage(), se); + } catch (ClassNotFoundException cnfe) { + throw new IllegalArgumentException("JDBC-Class not found. - " + cnfe.getMessage(), cnfe); + } + } + + private void establishConnection() throws SQLException, ClassNotFoundException { + Class.forName(drivername); + if (username == null) { + dbConn = DriverManager.getConnection(dbURL); + } else { + dbConn = DriverManager.getConnection(dbURL, username, password); + } + } + + /** + * Closes all resources used. + * + * @throws IOException Indicates that a resource could not be closed. + */ + @Override + public void close() throws IOException { + try { + resultSet.close(); + } catch (SQLException se) { + LOG.info("Inputformat couldn't be closed - " + se.getMessage()); + } catch (NullPointerException npe) { + } + try { + statement.close(); + } catch (SQLException se) { + LOG.info("Inputformat couldn't be closed - " + se.getMessage()); + } catch (NullPointerException npe) { + } + try { + dbConn.close(); + } catch (SQLException se) { + LOG.info("Inputformat couldn't be closed - " + se.getMessage()); + } catch (NullPointerException npe) { + } + } + + /** + * Checks whether all data has been read. + * + * @return boolean value indication whether all data has been read. + * @throws IOException + */ + @Override + public boolean reachedEnd() throws IOException { + try { + if (resultSet.isLast()) { + close(); + return true; + } + return false; + } catch (SQLException se) { + throw new IOException("Couldn't evaluate reachedEnd() - " + se.getMessage(), se); + } + } + + /** + * Stores the next resultSet row in a tuple + * + * @param tuple + * @return tuple containing next row + * @throws java.io.IOException + */ + @Override + public OUT nextRecord(OUT tuple) throws IOException { + try { + resultSet.next(); + if (columnTypes == null) { + extractTypes(tuple); + } + addValue(tuple); + return tuple; + } catch (SQLException se) { + close(); + throw new IOException("Couldn't read data - " + se.getMessage(), se); + } catch (NullPointerException npe) { + close(); + throw new IOException("Couldn't access resultSet", npe); + } + } + + private void extractTypes(OUT tuple) throws SQLException, IOException { + ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); + columnTypes = new int[resultSetMetaData.getColumnCount()]; + if (tuple.getArity() != columnTypes.length) { + close(); + throw new IOException("Tuple size does not match columncount"); + } + for (int pos = 0; pos < columnTypes.length; pos++) { + columnTypes[pos] = resultSetMetaData.getColumnType(pos + 1); + } + } + + /** + * Enters data value from the current resultSet into a Record. + * + * @param reuse Target Record. + */ + private void addValue(OUT reuse) throws SQLException { + for (int pos = 0; pos < columnTypes.length; pos++) { + switch (columnTypes[pos]) { + case java.sql.Types.NULL: + reuse.setField(NullValue.getInstance(), pos); + break; + case java.sql.Types.BOOLEAN: + reuse.setField(resultSet.getBoolean(pos + 1), pos); + break; + case java.sql.Types.BIT: + reuse.setField(resultSet.getBoolean(pos + 1), pos); + break; + case java.sql.Types.CHAR: + reuse.setField(resultSet.getString(pos + 1), pos); + break; + case java.sql.Types.NCHAR: + reuse.setField(resultSet.getString(pos + 1), pos); + break; + case java.sql.Types.VARCHAR: + reuse.setField(resultSet.getString(pos + 1), pos); + break; + case java.sql.Types.LONGVARCHAR: + reuse.setField(resultSet.getString(pos + 1), pos); + break; + case java.sql.Types.LONGNVARCHAR: + reuse.setField(resultSet.getString(pos + 1), pos); + break; + case java.sql.Types.TINYINT: + reuse.setField(resultSet.getShort(pos + 1), pos); + break; + case java.sql.Types.SMALLINT: + reuse.setField(resultSet.getShort(pos + 1), pos); + break; + case java.sql.Types.BIGINT: + reuse.setField(resultSet.getLong(pos + 1), pos); + break; + case java.sql.Types.INTEGER: + reuse.setField(resultSet.getInt(pos + 1), pos); + break; + case java.sql.Types.FLOAT: + reuse.setField(resultSet.getDouble(pos + 1), pos); + break; + case java.sql.Types.REAL: + reuse.setField(resultSet.getFloat(pos + 1), pos); + break; + case java.sql.Types.DOUBLE: + reuse.setField(resultSet.getDouble(pos + 1), pos); + break; + case java.sql.Types.DECIMAL: + reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos); + break; + case java.sql.Types.NUMERIC: + reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos); + break; + case java.sql.Types.DATE: + reuse.setField(resultSet.getDate(pos + 1).toString(), pos); + break; + case java.sql.Types.TIME: + reuse.setField(resultSet.getTime(pos + 1).getTime(), pos); + break; + case java.sql.Types.TIMESTAMP: + reuse.setField(resultSet.getTimestamp(pos + 1).toString(), pos); + break; + case java.sql.Types.SQLXML: + reuse.setField(resultSet.getSQLXML(pos + 1).toString(), pos); + break; + default: + throw new SQLException("Unsupported sql-type [" + columnTypes[pos] + "] on column [" + pos + "]"); + + // case java.sql.Types.BINARY: + // case java.sql.Types.VARBINARY: + // case java.sql.Types.LONGVARBINARY: + // case java.sql.Types.ARRAY: + // case java.sql.Types.JAVA_OBJECT: + // case java.sql.Types.BLOB: + // case java.sql.Types.CLOB: + // case java.sql.Types.NCLOB: + // case java.sql.Types.DATALINK: + // case java.sql.Types.DISTINCT: + // case java.sql.Types.OTHER: + // case java.sql.Types.REF: + // case java.sql.Types.ROWID: + // case java.sql.Types.STRUCT: + } + } + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { + return cachedStatistics; + } + + @Override + public InputSplit[] createInputSplits(int minNumSplits) throws IOException { + GenericInputSplit[] split = { + new GenericInputSplit(0, 1) + }; + return split; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) { + return new DefaultInputSplitAssigner(inputSplits); + } + + + /** + * A builder used to set parameters to the output format's configuration in a fluent way. + * @return builder + */ + public static JDBCInputFormatBuilder buildJDBCInputFormat() { + return new JDBCInputFormatBuilder(); + } + + public static class JDBCInputFormatBuilder { + private final JDBCInputFormat format; + + public JDBCInputFormatBuilder() { + this.format = new JDBCInputFormat(); + } + + public JDBCInputFormatBuilder setUsername(String username) { + format.username = username; + return this; + } + + public JDBCInputFormatBuilder setPassword(String password) { + format.password = password; + return this; + } + + public JDBCInputFormatBuilder setDrivername(String drivername) { + format.drivername = drivername; + return this; + } + + public JDBCInputFormatBuilder setDBUrl(String dbURL) { + format.dbURL = dbURL; + return this; + } + + public JDBCInputFormatBuilder setQuery(String query) { + format.query = query; + return this; + } + + public JDBCInputFormat finish() { + if (format.username == null) { + LOG.info("Username was not supplied separately."); + } + if (format.password == null) { + LOG.info("Password was not supplied separately."); + } + if (format.dbURL == null) { + throw new IllegalArgumentException("No dababase URL supplied."); + } + if (format.query == null) { + throw new IllegalArgumentException("No query suplied"); + } + if (format.drivername == null) { + throw new IllegalArgumentException("No driver supplied"); + } + return format; + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java new file mode 100644 index 0000000..6771772 --- /dev/null +++ b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.configuration.Configuration; + +/** + * OutputFormat to write tuples into a database. + * The OutputFormat has to be configured using the supplied OutputFormatBuilder. + * + * @param <OUT> + * @see Tuple + * @see DriverManager + */ +public class JDBCOutputFormat<OUT extends Tuple> implements OutputFormat<OUT> { + private static final long serialVersionUID = 1L; + + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class); + + private String username; + private String password; + private String drivername; + private String dbURL; + private String query; + private int batchInterval = 5000; + + private Connection dbConn; + private PreparedStatement upload; + + private SupportedTypes[] types = null; + + private int batchCount = 0; + + public JDBCOutputFormat() { + } + + @Override + public void configure(Configuration parameters) { + } + + /** + * Connects to the target database and initializes the prepared statement. + * + * @param taskNumber The number of the parallel instance. + * @throws IOException Thrown, if the output could not be opened due to an + * I/O problem. + */ + @Override + public void open(int taskNumber, int numTasks) throws IOException { + try { + establishConnection(); + upload = dbConn.prepareStatement(query); + } catch (SQLException sqe) { + close(); + throw new IllegalArgumentException("open() failed:\t!", sqe); + } catch (ClassNotFoundException cnfe) { + close(); + throw new IllegalArgumentException("JDBC-Class not found:\t", cnfe); + } + } + + private void establishConnection() throws SQLException, ClassNotFoundException { + Class.forName(drivername); + if (username == null) { + dbConn = DriverManager.getConnection(dbURL); + } else { + dbConn = DriverManager.getConnection(dbURL, username, password); + } + } + + private enum SupportedTypes { + BOOLEAN, + BYTE, + SHORT, + INTEGER, + LONG, + STRING, + FLOAT, + DOUBLE + } + + /** + * Adds a record to the prepared statement. + * <p> + * When this method is called, the output format is guaranteed to be opened. + * + * @param tuple The records to add to the output. + * @throws IOException Thrown, if the records could not be added due to an I/O problem. + */ + @Override + public void writeRecord(OUT tuple) throws IOException { + try { + if (types == null) { + extractTypes(tuple); + } + addValues(tuple); + upload.addBatch(); + batchCount++; + if (batchCount >= batchInterval) { + upload.executeBatch(); + batchCount = 0; + } + } catch (SQLException sqe) { + close(); + throw new IllegalArgumentException("writeRecord() failed", sqe); + } catch (IllegalArgumentException iae) { + close(); + throw new IllegalArgumentException("writeRecord() failed", iae); + } + } + + private void extractTypes(OUT tuple) { + types = new SupportedTypes[tuple.getArity()]; + for (int x = 0; x < tuple.getArity(); x++) { + types[x] = SupportedTypes.valueOf(tuple.getField(x).getClass().getSimpleName().toUpperCase()); + } + } + + private void addValues(OUT tuple) throws SQLException { + for (int index = 0; index < tuple.getArity(); index++) { + switch (types[index]) { + case BOOLEAN: + upload.setBoolean(index + 1, (Boolean) tuple.getField(index)); + break; + case BYTE: + upload.setByte(index + 1, (Byte) tuple.getField(index)); + break; + case SHORT: + upload.setShort(index + 1, (Short) tuple.getField(index)); + break; + case INTEGER: + upload.setInt(index + 1, (Integer) tuple.getField(index)); + break; + case LONG: + upload.setLong(index + 1, (Long) tuple.getField(index)); + break; + case STRING: + upload.setString(index + 1, (String) tuple.getField(index)); + break; + case FLOAT: + upload.setFloat(index + 1, (Float) tuple.getField(index)); + break; + case DOUBLE: + upload.setDouble(index + 1, (Double) tuple.getField(index)); + break; + } + } + } + + /** + * Executes prepared statement and closes all resources of this instance. + * + * @throws IOException Thrown, if the input could not be closed properly. + */ + @Override + public void close() throws IOException { + try { + upload.executeBatch(); + batchCount = 0; + } catch (SQLException se) { + throw new IllegalArgumentException("close() failed", se); + } catch (NullPointerException se) { + } + try { + upload.close(); + } catch (SQLException se) { + LOG.info("Inputformat couldn't be closed - " + se.getMessage()); + } catch (NullPointerException npe) { + } + try { + dbConn.close(); + } catch (SQLException se) { + LOG.info("Inputformat couldn't be closed - " + se.getMessage()); + } catch (NullPointerException npe) { + } + } + + public static JDBCOutputFormatBuilder buildJDBCOutputFormat() { + return new JDBCOutputFormatBuilder(); + } + + public static class JDBCOutputFormatBuilder { + private final JDBCOutputFormat format; + + protected JDBCOutputFormatBuilder() { + this.format = new JDBCOutputFormat(); + } + + public JDBCOutputFormatBuilder setUsername(String username) { + format.username = username; + return this; + } + + public JDBCOutputFormatBuilder setPassword(String password) { + format.password = password; + return this; + } + + public JDBCOutputFormatBuilder setDrivername(String drivername) { + format.drivername = drivername; + return this; + } + + public JDBCOutputFormatBuilder setDBUrl(String dbURL) { + format.dbURL = dbURL; + return this; + } + + public JDBCOutputFormatBuilder setQuery(String query) { + format.query = query; + return this; + } + + public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) { + format.batchInterval = batchInterval; + return this; + } + + /** + Finalizes the configuration and checks validity. + @return Configured JDBCOutputFormat + */ + public JDBCOutputFormat finish() { + if (format.username == null) { + LOG.info("Username was not supplied separately."); + } + if (format.password == null) { + LOG.info("Password was not supplied separately."); + } + if (format.dbURL == null) { + throw new IllegalArgumentException("No dababase URL supplied."); + } + if (format.query == null) { + throw new IllegalArgumentException("No query suplied"); + } + if (format.drivername == null) { + throw new IllegalArgumentException("No driver supplied"); + } + return format; + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java new file mode 100644 index 0000000..7b012ba --- /dev/null +++ b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc.example; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.jdbc.JDBCInputFormat; +import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; + +public class JDBCExample { + + public static void main(String[] args) throws Exception { + prepareTestDb(); + + ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple5> source + = environment.createInput(JDBCInputFormat.buildJDBCInputFormat() + .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") + .setDBUrl("jdbc:derby:memory:ebookshop") + .setQuery("select * from books") + .finish(), + new TupleTypeInfo(Tuple5.class, INT_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO, DOUBLE_TYPE_INFO, INT_TYPE_INFO) + ); + + source.output(JDBCOutputFormat.buildJDBCOutputFormat() + .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") + .setDBUrl("jdbc:derby:memory:ebookshop") + .setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)") + .finish()); + environment.execute(); + } + + private static void prepareTestDb() throws Exception { + String dbURL = "jdbc:derby:memory:ebookshop;create=true"; + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + Connection conn = DriverManager.getConnection(dbURL); + + StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books ("); + sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); + sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); + sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); + sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); + sqlQueryBuilder.append("qty INT DEFAULT NULL,"); + sqlQueryBuilder.append("PRIMARY KEY (id))"); + + Statement stat = conn.createStatement(); + stat.executeUpdate(sqlQueryBuilder.toString()); + stat.close(); + + sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks ("); + sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); + sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); + sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); + sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); + sqlQueryBuilder.append("qty INT DEFAULT NULL,"); + sqlQueryBuilder.append("PRIMARY KEY (id))"); + + stat = conn.createStatement(); + stat.executeUpdate(sqlQueryBuilder.toString()); + stat.close(); + + sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES "); + sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),"); + sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),"); + sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),"); + sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),"); + sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)"); + + stat = conn.createStatement(); + stat.execute(sqlQueryBuilder.toString()); + stat.close(); + + conn.close(); + } +}