http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java deleted file mode 100644 index 89baa98..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.record; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.lang3.SerializationUtils; -import org.apache.flink.api.common.functions.GroupCombineFunction; -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.common.functions.RichFunction; -import org.apache.flink.api.common.operators.SingleInputSemanticProperties; -import org.apache.flink.api.common.operators.util.FieldSet; -import org.apache.flink.api.common.operators.util.UserCodeWrapper; -import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields; -import org.apache.flink.api.java.record.functions.ReduceFunction; -import org.apache.flink.api.java.record.operators.ReduceOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.Record; -import org.apache.flink.util.Collector; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -@SuppressWarnings({ "serial", "deprecation" }) -public class ReduceWrappingFunctionTest { - - @SuppressWarnings("unchecked") - @Test - public void testWrappedReduceObject() { - try { - AtomicInteger methodCounter = new AtomicInteger(); - - ReduceOperator reduceOp = ReduceOperator.builder(new TestReduceFunction(methodCounter)).build(); - - RichFunction reducer = (RichFunction) reduceOp.getUserCodeWrapper().getUserCodeObject(); - - // test the method invocations - reducer.close(); - reducer.open(new Configuration()); - assertEquals(2, methodCounter.get()); - - // prepare the reduce / combine tests - final List<Record> target = new ArrayList<Record>(); - Collector<Record> collector = new Collector<Record>() { - @Override - public void collect(Record record) { - target.add(record); - } - @Override - public void close() {} - }; - - List<Record> source = new ArrayList<Record>(); - source.add(new Record(new IntValue(42), new LongValue(11))); - source.add(new Record(new IntValue(13), new LongValue(17))); - - // test reduce - ((GroupReduceFunction<Record, Record>) reducer).reduce(source, collector); - assertEquals(2, target.size()); - assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class)); - assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class)); - assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class)); - assertEquals(new LongValue(17), target.get(1).getField(1, LongValue.class)); - target.clear(); - - // test combine - ((GroupCombineFunction<Record, Record>) reducer).combine(source, collector); - assertEquals(2, target.size()); - assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class)); - assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class)); - assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class)); - assertEquals(new LongValue(17), target.get(1).getField(1, LongValue.class)); - target.clear(); - - // test the serialization - SerializationUtils.clone((java.io.Serializable) reducer); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @SuppressWarnings("unchecked") - @Test - public void testWrappedReduceClass() { - try { - ReduceOperator reduceOp = ReduceOperator.builder(TestReduceFunction.class).build(); - - UserCodeWrapper<GroupReduceFunction<Record, Record>> udf = reduceOp.getUserCodeWrapper(); - UserCodeWrapper<GroupReduceFunction<Record, Record>> copy = SerializationUtils.clone(udf); - GroupReduceFunction<Record, Record> reducer = copy.getUserCodeObject(); - - // prepare the reduce / combine tests - final List<Record> target = new ArrayList<Record>(); - Collector<Record> collector = new Collector<Record>() { - @Override - public void collect(Record record) { - target.add(record); - } - @Override - public void close() {} - }; - - List<Record> source = new ArrayList<Record>(); - source.add(new Record(new IntValue(42), new LongValue(11))); - source.add(new Record(new IntValue(13), new LongValue(17))); - - // test reduce - reducer.reduce(source, collector); - assertEquals(2, target.size()); - assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class)); - assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class)); - assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class)); - assertEquals(new LongValue(17), target.get(1).getField(1, LongValue.class)); - target.clear(); - - // test combine - ((GroupCombineFunction<Record, Record>) reducer).combine(source, collector); - assertEquals(2, target.size()); - assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class)); - assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class)); - assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class)); - assertEquals(new LongValue(17), target.get(1).getField(1, LongValue.class)); - target.clear(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testExtractSemantics() { - try { - { - ReduceOperator reduceOp = ReduceOperator.builder(new TestReduceFunction()).build(); - - SingleInputSemanticProperties props = reduceOp.getSemanticProperties(); - FieldSet fw2 = props.getForwardingTargetFields(0, 2); - FieldSet fw4 = props.getForwardingTargetFields(0, 4); - assertNotNull(fw2); - assertNotNull(fw4); - assertEquals(1, fw2.size()); - assertEquals(1, fw4.size()); - assertTrue(fw2.contains(2)); - assertTrue(fw4.contains(4)); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCombinable() { - try { - { - ReduceOperator reduceOp = ReduceOperator.builder(new TestReduceFunction()).build(); - assertTrue(reduceOp.isCombinable()); - } - { - ReduceOperator reduceOp = ReduceOperator.builder(TestReduceFunction.class).build(); - assertTrue(reduceOp.isCombinable()); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - // -------------------------------------------------------------------------------------------- - - @Combinable - @ConstantFields({2, 4}) - public static class TestReduceFunction extends ReduceFunction { - - private final AtomicInteger methodCounter; - - private TestReduceFunction(AtomicInteger methodCounter) { - this.methodCounter= methodCounter; - } - - public TestReduceFunction() { - methodCounter = new AtomicInteger(); - } - - @Override - public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception { - while (records.hasNext()) { - out.collect(records.next()); - } - } - - @Override - public void close() throws Exception { - methodCounter.incrementAndGet(); - super.close(); - } - - @Override - public void open(Configuration parameters) throws Exception { - methodCounter.incrementAndGet(); - super.open(parameters); - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java deleted file mode 100644 index 5ccfdd9..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java +++ /dev/null @@ -1,406 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.record.io; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; - -import org.junit.Assert; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; -import org.apache.flink.types.StringValue; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class CsvInputFormatTest { - - protected File tempFile; - - private final CsvInputFormat format = new CsvInputFormat(); - - //Static variables for testing the removal of \r\n to \n - private static final String FIRST_PART = "That is the first part"; - - private static final String SECOND_PART = "That is the second part"; - - // -------------------------------------------------------------------------------------------- - @Before - public void setup() { - format.setFilePath("file:///some/file/that/will/not/be/read"); - } - - @After - public void setdown() throws Exception { - if (this.format != null) { - this.format.close(); - } - if (this.tempFile != null) { - this.tempFile.delete(); - } - } - - @Test - public void testConfigureEmptyConfig() { - try { - Configuration config = new Configuration(); - - // empty configuration, plus no fields on the format itself is not valid - try { - format.configure(config); - fail(); // should give an error - } catch (IllegalConfigurationException e) { - ; // okay - } - } - catch (Exception ex) { - Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); - } - } - - @SuppressWarnings("unchecked") - @Test - public void readWithEmptyFieldInstanceParameters() { - try { - final String fileContent = "abc|def|ghijk\nabc||hhg\n|||"; - final FileInputSplit split = createTempFile(fileContent); - - final Configuration parameters = new Configuration(); - - format.setFieldDelimiter('|'); - format.setFieldTypes(StringValue.class, StringValue.class, StringValue.class); - - format.configure(parameters); - format.open(split); - - Record record = new Record(); - - assertNotNull(format.nextRecord(record)); - assertEquals("abc", record.getField(0, StringValue.class).getValue()); - assertEquals("def", record.getField(1, StringValue.class).getValue()); - assertEquals("ghijk", record.getField(2, StringValue.class).getValue()); - - assertNotNull(format.nextRecord(record)); - assertEquals("abc", record.getField(0, StringValue.class).getValue()); - assertEquals("", record.getField(1, StringValue.class).getValue()); - assertEquals("hhg", record.getField(2, StringValue.class).getValue()); - - assertNotNull(format.nextRecord(record)); - assertEquals("", record.getField(0, StringValue.class).getValue()); - assertEquals("", record.getField(1, StringValue.class).getValue()); - assertEquals("", record.getField(2, StringValue.class).getValue()); - } - catch (Exception ex) { - ex.printStackTrace(); - Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); - } - } - - @Test - public void readWithEmptyFieldConfigParameters() { - try { - final String fileContent = "abc|def|ghijk\nabc||hhg\n|||"; - final FileInputSplit split = createTempFile(fileContent); - - final Configuration parameters = new Configuration(); - new CsvInputFormat.ConfigBuilder(null, parameters) - .field(StringValue.class, 0).field(StringValue.class, 1).field(StringValue.class, 2); - - format.setFieldDelimiter("|"); - - format.configure(parameters); - format.open(split); - - Record record = new Record(); - - assertNotNull(format.nextRecord(record)); - assertEquals("abc", record.getField(0, StringValue.class).getValue()); - assertEquals("def", record.getField(1, StringValue.class).getValue()); - assertEquals("ghijk", record.getField(2, StringValue.class).getValue()); - - assertNotNull(format.nextRecord(record)); - assertEquals("abc", record.getField(0, StringValue.class).getValue()); - assertEquals("", record.getField(1, StringValue.class).getValue()); - assertEquals("hhg", record.getField(2, StringValue.class).getValue()); - - assertNotNull(format.nextRecord(record)); - assertEquals("", record.getField(0, StringValue.class).getValue()); - assertEquals("", record.getField(1, StringValue.class).getValue()); - assertEquals("", record.getField(2, StringValue.class).getValue()); - } - catch (Exception ex) { - Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); - } - } - - @Test - public void testReadAll() throws IOException { - try { - final String fileContent = "111|222|333|444|555\n666|777|888|999|000|"; - final FileInputSplit split = createTempFile(fileContent); - - final Configuration parameters = new Configuration(); - - new CsvInputFormat.ConfigBuilder(null, parameters) - .fieldDelimiter('|') - .field(IntValue.class, 0).field(IntValue.class, 1).field(IntValue.class, 2) - .field(IntValue.class, 3).field(IntValue.class, 4); - - format.configure(parameters); - format.open(split); - - Record record = new Record(); - - assertNotNull(format.nextRecord(record)); - assertEquals(111, record.getField(0, IntValue.class).getValue()); - assertEquals(222, record.getField(1, IntValue.class).getValue()); - assertEquals(333, record.getField(2, IntValue.class).getValue()); - assertEquals(444, record.getField(3, IntValue.class).getValue()); - assertEquals(555, record.getField(4, IntValue.class).getValue()); - - assertNotNull(format.nextRecord(record)); - assertEquals(666, record.getField(0, IntValue.class).getValue()); - assertEquals(777, record.getField(1, IntValue.class).getValue()); - assertEquals(888, record.getField(2, IntValue.class).getValue()); - assertEquals(999, record.getField(3, IntValue.class).getValue()); - assertEquals(000, record.getField(4, IntValue.class).getValue()); - - assertNull(format.nextRecord(record)); - assertTrue(format.reachedEnd()); - } - catch (Exception ex) { - Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); - } - } - - @Test - public void testReadFirstN() throws IOException { - try { - final String fileContent = "111|222|333|444|555|\n666|777|888|999|000|"; - final FileInputSplit split = createTempFile(fileContent); - - final Configuration parameters = new Configuration(); - - new CsvInputFormat.ConfigBuilder(null, parameters) - .fieldDelimiter('|') - .field(IntValue.class, 0).field(IntValue.class, 1); - - format.configure(parameters); - format.open(split); - - Record record = new Record(); - - assertNotNull(format.nextRecord(record)); - assertEquals(111, record.getField(0, IntValue.class).getValue()); - assertEquals(222, record.getField(1, IntValue.class).getValue()); - boolean notParsed = false; - try { - record.getField(2, IntValue.class); - } catch (IndexOutOfBoundsException ioo) { - notParsed = true; - } - assertTrue(notParsed); - - assertNotNull(format.nextRecord(record)); - assertEquals(666, record.getField(0, IntValue.class).getValue()); - assertEquals(777, record.getField(1, IntValue.class).getValue()); - notParsed = false; - try { - record.getField(2, IntValue.class); - } catch (IndexOutOfBoundsException ioo) { - notParsed = true; - } - assertTrue(notParsed); - - assertNull(format.nextRecord(record)); - assertTrue(format.reachedEnd()); - } - catch (Exception ex) { - Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); - } - - } - - @Test - public void testReadSparse() throws IOException { - try { - final String fileContent = "111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666|555|444|333|222|111|"; - final FileInputSplit split = createTempFile(fileContent); - - final Configuration parameters = new Configuration(); - - new CsvInputFormat.ConfigBuilder(null, parameters) - .fieldDelimiter('|') - .field(IntValue.class, 0).field(IntValue.class, 3).field(IntValue.class, 7); - - format.configure(parameters); - format.open(split); - - Record record = new Record(); - - assertNotNull(format.nextRecord(record)); - assertEquals(111, record.getField(0, IntValue.class).getValue()); - assertEquals(444, record.getField(1, IntValue.class).getValue()); - assertEquals(888, record.getField(2, IntValue.class).getValue()); - - assertNotNull(format.nextRecord(record)); - assertEquals(000, record.getField(0, IntValue.class).getValue()); - assertEquals(777, record.getField(1, IntValue.class).getValue()); - assertEquals(333, record.getField(2, IntValue.class).getValue()); - - assertNull(format.nextRecord(record)); - assertTrue(format.reachedEnd()); - } - catch (Exception ex) { - Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); - } - } - - @Test - public void testReadSparseShufflePosition() throws IOException { - try { - final String fileContent = "111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666|555|444|333|222|111|"; - final FileInputSplit split = createTempFile(fileContent); - - final Configuration parameters = new Configuration(); - - new CsvInputFormat.ConfigBuilder(null, parameters) - .fieldDelimiter('|') - .field(IntValue.class, 8).field(IntValue.class, 1).field(IntValue.class, 3); - - format.configure(parameters); - format.open(split); - - Record record = new Record(); - - assertNotNull(format.nextRecord(record)); - assertEquals(999, record.getField(0, IntValue.class).getValue()); - assertEquals(222, record.getField(1, IntValue.class).getValue()); - assertEquals(444, record.getField(2, IntValue.class).getValue()); - - assertNotNull(format.nextRecord(record)); - assertEquals(222, record.getField(0, IntValue.class).getValue()); - assertEquals(999, record.getField(1, IntValue.class).getValue()); - assertEquals(777, record.getField(2, IntValue.class).getValue()); - - assertNull(format.nextRecord(record)); - assertTrue(format.reachedEnd()); - } - catch (Exception ex) { - Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); - } - } - - private FileInputSplit createTempFile(String content) throws IOException { - this.tempFile = File.createTempFile("test_contents", "tmp"); - this.tempFile.deleteOnExit(); - - DataOutputStream dos = new DataOutputStream(new FileOutputStream(tempFile)); - dos.writeBytes(content); - dos.close(); - - return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"}); - } - - @Test - public void testWindowsLineEndRemoval() { - - //Check typical use case -- linux file is correct and it is set up to linuc(\n) - this.testRemovingTrailingCR("\n", "\n"); - - //Check typical windows case -- windows file endings and file has windows file endings set up - this.testRemovingTrailingCR("\r\n", "\r\n"); - - //Check problematic case windows file -- windows file endings(\r\n) but linux line endings (\n) set up - this.testRemovingTrailingCR("\r\n", "\n"); - - //Check problematic case linux file -- linux file endings (\n) but windows file endings set up (\r\n) - //Specific setup for windows line endings will expect \r\n because it has to be set up and is not standard. - } - - private void testRemovingTrailingCR(String lineBreakerInFile, String lineBreakerSetup) { - File tempFile=null; - - String fileContent = CsvInputFormatTest.FIRST_PART + lineBreakerInFile + CsvInputFormatTest.SECOND_PART + lineBreakerInFile; - - try { - // create input file - tempFile = File.createTempFile("CsvInputFormatTest", "tmp"); - tempFile.deleteOnExit(); - tempFile.setWritable(true); - - OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile)); - wrt.write(fileContent); - wrt.close(); - - //Instantiate input format - CsvInputFormat inputFormat = new CsvInputFormat(); - - Configuration parameters = new Configuration(); - new CsvInputFormat.ConfigBuilder(null, parameters) - .field(StringValue.class, 0).filePath(tempFile.toURI().toString()); - - - inputFormat.configure(parameters); - - inputFormat.setDelimiter(lineBreakerSetup); - - FileInputSplit[] splits = inputFormat.createInputSplits(1); - - inputFormat.open(splits[0]); - - Record record = new Record(); - - Record result = inputFormat.nextRecord(record); - - assertNotNull("Expecting to not return null", result); - - - - assertEquals(FIRST_PART, result.getField(0, StringValue.class).getValue()); - - result = inputFormat.nextRecord(record); - - assertNotNull("Expecting to not return null", result); - assertEquals(SECOND_PART, result.getField(0, StringValue.class).getValue()); - - } - catch (Throwable t) { - System.err.println("test failed with exception: " + t.getMessage()); - t.printStackTrace(System.err); - fail("Test erroneous"); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvOutputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvOutputFormatTest.java deleted file mode 100644 index 9eb794f..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvOutputFormatTest.java +++ /dev/null @@ -1,465 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.record.io; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; - -import org.junit.Assert; - -import org.apache.flink.api.java.record.io.CsvOutputFormat; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.Path; -import org.apache.flink.core.fs.FileSystem.WriteMode; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; -import org.apache.flink.types.StringValue; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class CsvOutputFormatTest { - - protected Configuration config; - - protected File tempFile; - - private final CsvOutputFormat format = new CsvOutputFormat(); - - // -------------------------------------------------------------------------------------------- - - @Before - public void setup() throws IOException { - this.tempFile = File.createTempFile("test_output", "tmp"); - this.format.setOutputFilePath(new Path(tempFile.toURI())); - this.format.setWriteMode(WriteMode.OVERWRITE); - } - - @After - public void setdown() throws Exception { - if (this.format != null) { - this.format.close(); - } - if (this.tempFile != null) { - this.tempFile.delete(); - } - } - - @Test - public void testConfigure() - { - try { - Configuration config = new Configuration(); - - // check missing number of fields - boolean validConfig = true; - try { - format.configure(config); - } catch(IllegalArgumentException iae) { - validConfig = false; - } catch(IllegalStateException ise) { - validConfig = false; - } - assertFalse(validConfig); - - // check missing file parser - config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2); - validConfig = true; - try { - format.configure(config); - } catch(IllegalArgumentException iae) { - validConfig = false; - } catch(IllegalStateException ise) { - validConfig = false; - } - assertFalse(validConfig); - - // check valid config - config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class); - config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, IntValue.class); - validConfig = true; - try { - format.configure(config); - } catch(IllegalArgumentException iae) { - validConfig = false; - } - assertTrue(validConfig); - - // check invalid file parser config - config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 3); - validConfig = true; - try { - format.configure(config); - } catch(IllegalArgumentException iae) { - validConfig = false; - } - assertFalse(validConfig); - - // check valid config - config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 2, StringValue.class); - validConfig = true; - try { - format.configure(config); - } catch(IllegalArgumentException iae) { - validConfig = false; - } - assertTrue(validConfig); - - // check valid config - config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|"); - validConfig = true; - try { - format.configure(config); - } catch(IllegalArgumentException iae) { - validConfig = false; - System.out.println(iae.getMessage()); - } - assertTrue(validConfig); - - // check invalid text pos config - config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 0); - validConfig = true; - try { - format.configure(config); - } catch(IllegalArgumentException iae) { - validConfig = false; - } - assertFalse(validConfig); - - // check valid text pos config - config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 3); - config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 2, 9); - validConfig = true; - try { - format.configure(config); - } catch(IllegalArgumentException iae) { - validConfig = false; - } - assertTrue(validConfig); - } - catch (Exception ex) { - Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); - } - } - - @Test - public void testWriteNoRecPosNoLenient() - { - try { - Configuration config = new Configuration(); - config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|"); - config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2); - config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class); - config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, IntValue.class); - - format.configure(config); - - try { - format.open(0, 1); - } catch (IOException e) { - fail(e.getMessage()); - } - - Record r = new Record(2); - - try { - r.setField(0, new StringValue("Hello World")); - r.setField(1, new IntValue(42)); - format.writeRecord(r); - - r.setField(0, new StringValue("AbCdE")); - r.setField(1, new IntValue(13)); - format.writeRecord(r); - - format.close(); - - BufferedReader dis = new BufferedReader(new FileReader(tempFile)); - - assertTrue((dis.readLine()+"\n").equals("Hello World|42\n")); - assertTrue((dis.readLine()+"\n").equals("AbCdE|13\n")); - - dis.close(); - - } catch (IOException e) { - fail(e.getMessage()); - } - } - catch (Exception ex) { - Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); - } - } - - @Test - public void testWriteNoRecPosNoLenientFail() - { - try { - Configuration config = new Configuration(); - config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|"); - config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2); - config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class); - config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, IntValue.class); - - format.configure(config); - - try { - format.open(0, 1); - } catch (IOException e) { - fail(e.getMessage()); - } - - Record r = new Record(2); - - boolean success = true; - - try { - r.setField(0, new StringValue("Hello World")); - r.setField(1, new IntValue(42)); - format.writeRecord(r); - - r.setNull(0); - r.setField(1, new IntValue(13)); - format.writeRecord(r); - - format.close(); - - } catch (IOException e) { - success = false; - } catch (RuntimeException re) { - success = false; - } - - assertFalse(success); - } - catch (Exception ex) { - Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); - } - } - - @Test - public void testWriteNoRecPosLenient() - { - try { - Configuration config = new Configuration(); - config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|"); - config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2); - config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class); - config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, IntValue.class); - config.setBoolean(CsvOutputFormat.LENIENT_PARSING, true); - - format.configure(config); - - try { - format.open(0, 1); - } catch (IOException e) { - fail(e.getMessage()); - } - - Record r = new Record(2); - - try { - r.setField(0, new StringValue("Hello World")); - r.setField(1, new IntValue(42)); - format.writeRecord(r); - - r.setNull(0); - r.setField(1, new IntValue(13)); - format.writeRecord(r); - - format.close(); - - BufferedReader dis = new BufferedReader(new FileReader(tempFile)); - - assertTrue((dis.readLine()+"\n").equals("Hello World|42\n")); - assertTrue((dis.readLine()+"\n").equals("|13\n")); - - dis.close(); - - } catch (IOException e) { - fail(e.getMessage()); - } - } - catch (Exception ex) { - Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); - } - } - - @Test - public void testWriteRecPosNoLenient() - { - try { - Configuration config = new Configuration(); - config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|"); - config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2); - config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class); - config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 2); - config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, StringValue.class); - config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 0); - - format.configure(config); - - try { - format.open(0, 1); - } catch (IOException e) { - fail(e.getMessage()); - } - - Record r = new Record(2); - - try { - r.setField(0, new StringValue("Hello World")); - r.setField(1, new IntValue(42)); - r.setField(2, new StringValue("Hello User")); - format.writeRecord(r); - - r.setField(0, new StringValue("AbCdE")); - r.setField(1, new IntValue(13)); - r.setField(2, new StringValue("ZyXvW")); - format.writeRecord(r); - - format.close(); - - BufferedReader dis = new BufferedReader(new FileReader(tempFile)); - - assertTrue((dis.readLine()+"\n").equals("Hello User|Hello World\n")); - assertTrue((dis.readLine()+"\n").equals("ZyXvW|AbCdE\n")); - - dis.close(); - - } catch (IOException e) { - fail(e.getMessage()); - } - } - catch (Exception ex) { - Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); - } - } - - @Test - public void testWriteRecPosNoLenientFail() - { - try { - Configuration config = new Configuration(); - config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|"); - config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2); - config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class); - config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 2); - config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, StringValue.class); - config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 0); - - format.configure(config); - - try { - format.open(0, 1); - } catch (IOException e) { - fail(e.getMessage()); - } - - Record r = new Record(2); - - boolean success = true; - - try { - r.setField(0, new StringValue("Hello World")); - r.setField(1, new IntValue(42)); - r.setField(2, new StringValue("Hello User")); - format.writeRecord(r); - - r = new Record(); - - r.setField(0, new StringValue("AbCdE")); - r.setField(1, new IntValue(13)); - format.writeRecord(r); - - format.close(); - - } catch (IOException e) { - success = false; - } catch (RuntimeException re) { - success = false; - } - - assertFalse(success); - } - catch (Exception ex) { - Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); - } - } - - @Test - public void testWriteRecPosLenient() - { - try { - Configuration config = new Configuration(); - config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|"); - config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2); - config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class); - config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 2); - config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, StringValue.class); - config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 0); - config.setBoolean(CsvOutputFormat.LENIENT_PARSING, true); - - format.configure(config); - - try { - format.open(0, 1); - } catch (IOException e) { - fail(e.getMessage()); - } - - Record r = new Record(2); - - try { - r.setField(0, new StringValue("Hello World")); - r.setField(1, new IntValue(42)); - r.setField(2, new StringValue("Hello User")); - format.writeRecord(r); - - r = new Record(); - - r.setField(0, new StringValue("AbCdE")); - r.setField(1, new IntValue(13)); - format.writeRecord(r); - - format.close(); - - BufferedReader dis = new BufferedReader(new FileReader(tempFile)); - - assertTrue((dis.readLine()+"\n").equals("Hello User|Hello World\n")); - assertTrue((dis.readLine()+"\n").equals("|AbCdE\n")); - - dis.close(); - - } catch (IOException e) { - fail(e.getMessage()); - } - } - catch (Exception ex) { - Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); - } - } - -} - http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java deleted file mode 100644 index 4aec38e..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java +++ /dev/null @@ -1,298 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.record.io; - -import java.io.IOException; - -import org.junit.Assert; -import org.apache.flink.api.common.io.DefaultInputSplitAssigner; -import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.api.java.record.io.ExternalProcessFixedLengthInputFormat; -import org.apache.flink.api.java.record.io.ExternalProcessInputFormat; -import org.apache.flink.api.java.record.io.ExternalProcessInputSplit; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.io.GenericInputSplit; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; -import org.apache.flink.util.OperatingSystem; -import org.junit.Before; -import org.junit.Test; - -public class ExternalProcessFixedLengthInputFormatTest { - -private ExternalProcessFixedLengthInputFormat<ExternalProcessInputSplit> format; - - private final String neverEndingCommand = "cat /dev/urandom"; - private final String thousandRecordsCommand = "dd if=/dev/zero bs=8 count=1000"; - private final String incompleteRecordsCommand = "dd if=/dev/zero bs=7 count=2"; - private final String failingCommand = "ls /I/do/not/exist"; - - @Before - public void prepare() { - format = new MyExternalProcessTestInputFormat(); - } - - @Test - public void testOpen() { - - if(OperatingSystem.isWindows()) { - return; - } - - Configuration config = new Configuration(); - config.setInteger(ExternalProcessFixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8); - ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, this.neverEndingCommand); - - boolean processDestroyed = false; - try { - format.configure(config); - format.open(split); - - String[] cmd = {"/bin/sh","-c","ps aux | grep -v grep | grep \"cat /dev/urandom\" | wc -l"}; - - byte[] wcOut = new byte[128]; - Process p = Runtime.getRuntime().exec(cmd); - p.getInputStream().read(wcOut); - int pCnt = Integer.parseInt(new String(wcOut).trim()); - Assert.assertTrue(pCnt > 0); - - format.close(); - } catch (IOException e) { - Assert.fail(); - } catch (RuntimeException e) { - if(e.getMessage().equals("External process was destroyed although stream was not fully read.")) { - processDestroyed = true; - } - } finally { - Assert.assertTrue(processDestroyed); - } - } - - @Test - public void testCheckExitCode() { - - if(OperatingSystem.isWindows()) { - return; - } - - Configuration config = new Configuration(); - config.setInteger(ExternalProcessFixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8); - ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, failingCommand); - - format.configure(config); - boolean invalidExitCode = false; - try { - format.open(split); - format.waitForProcessToFinish(); - format.close(); - } catch (IOException e) { - Assert.fail(); - } catch (InterruptedException e) { - Assert.fail(); - } catch (RuntimeException e) { - if(e.getMessage().startsWith("External process did not finish with an allowed exit code:")) { - invalidExitCode = true; - } - } - Assert.assertTrue(invalidExitCode); - - invalidExitCode = false; - config.setString(ExternalProcessInputFormat.ALLOWEDEXITCODES_PARAMETER_KEY,"0,1,2"); - format.configure(config); - try { - format.open(split); - // wait for process to start... - Thread.sleep(100); - format.close(); - } catch (IOException e) { - Assert.fail(); - } catch (InterruptedException e) { - Assert.fail(); - } catch (RuntimeException e) { - if(e.getMessage().startsWith("External process did not finish with an allowed exit code:")) { - invalidExitCode = true; - } - } - Assert.assertTrue(!invalidExitCode); - - } - - @Test - public void testUserCodeTermination() { - - if(OperatingSystem.isWindows()) { - return; - } - - Configuration config = new Configuration(); - config.setInteger(ExternalProcessFixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8); - config.setInteger(MyExternalProcessTestInputFormat.FAILCOUNT_PARAMETER_KEY, 100); - ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, this.neverEndingCommand); - Record record = new Record(); - - boolean userException = false; - boolean processDestroyed = false; - try { - format.configure(config); - format.open(split); - while(!format.reachedEnd()) { - try { - format.nextRecord(record); - } catch(RuntimeException re) { - userException = true; - break; - } - } - format.close(); - } catch (IOException e) { - Assert.fail(); - } catch (RuntimeException e) { - if(e.getMessage().equals("External process was destroyed although stream was not fully read.")) { - processDestroyed = true; - } - } finally { - Assert.assertTrue(userException && processDestroyed); - } - } - - @Test - public void testReadStream() { - - if(OperatingSystem.isWindows()) { - return; - } - - Configuration config = new Configuration(); - config.setInteger(ExternalProcessFixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8); - ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, this.thousandRecordsCommand); - Record record = new Record(); - - int cnt = 0; - try { - format.configure(config); - format.open(split); - while(!format.reachedEnd()) { - if (format.nextRecord(record) != null) { - cnt++; - } - } - format.close(); - } catch (IOException e) { - Assert.fail(); - } catch (RuntimeException e) { - Assert.fail(e.getMessage()); - } - Assert.assertTrue(cnt == 1000); - } - - @Test - public void testReadInvalidStream() { - - if(OperatingSystem.isWindows()) { - return; - } - - Configuration config = new Configuration(); - config.setInteger(ExternalProcessFixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8); - ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, this.incompleteRecordsCommand); - Record record = new Record(); - - boolean incompleteRecordDetected = false; - @SuppressWarnings("unused") - int cnt = 0; - try { - format.configure(config); - format.open(split); - while(!format.reachedEnd()) { - if (format.nextRecord(record) != null) { - cnt++; - } - } - format.close(); - } catch (IOException e) { - Assert.fail(); - } catch (RuntimeException e) { - if(e.getMessage().equals("External process produced incomplete record")) { - incompleteRecordDetected = true; - } else { - Assert.fail(e.getMessage()); - } - } - Assert.assertTrue(incompleteRecordDetected); - } - - private final class MyExternalProcessTestInputFormat extends ExternalProcessFixedLengthInputFormat<ExternalProcessInputSplit> { - private static final long serialVersionUID = 1L; - - public static final String FAILCOUNT_PARAMETER_KEY = "test.failingCount"; - - private long cnt = 0; - private int failCnt; - - @Override - public void configure(Configuration parameters) { - super.configure(parameters); - failCnt = parameters.getInteger(FAILCOUNT_PARAMETER_KEY, Integer.MAX_VALUE); - } - - @Override - public boolean readBytes(Record record, byte[] bytes, int startPos) { - - if(cnt == failCnt) { - throw new RuntimeException("This is a test exception!"); - } - - int v1 = 0; - v1 = v1 | (0xFF & bytes[startPos+0]); - v1 = (v1 << 8) | (0xFF & bytes[startPos+1]); - v1 = (v1 << 8) | (0xFF & bytes[startPos+2]); - v1 = (v1 << 8) | (0xFF & bytes[startPos+3]); - - int v2 = 0; - v2 = v2 | (0xFF & bytes[startPos+4]); - v2 = (v2 << 8) | (0xFF & bytes[startPos+5]); - v2 = (v2 << 8) | (0xFF & bytes[startPos+6]); - v2 = (v2 << 8) | (0xFF & bytes[startPos+7]); - - record.setField(0,new IntValue(v1)); - record.setField(1,new IntValue(v2)); - - cnt++; - - return true; - } - - @Override - public ExternalProcessInputSplit[] createInputSplits(int minNumSplits) - throws IOException { - return null; - } - - @Override - public DefaultInputSplitAssigner getInputSplitAssigner(GenericInputSplit[] splits) { - return new DefaultInputSplitAssigner(splits); - } - - @Override - public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java deleted file mode 100644 index 6b8cacb..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java +++ /dev/null @@ -1,283 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.record.io; - -import java.io.IOException; - -import org.junit.Assert; -import org.apache.flink.api.common.io.DefaultInputSplitAssigner; -import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.api.java.record.io.ExternalProcessInputFormat; -import org.apache.flink.api.java.record.io.ExternalProcessInputSplit; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.io.GenericInputSplit; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; -import org.apache.flink.util.OperatingSystem; -import org.junit.Before; -import org.junit.Test; - -public class ExternalProcessInputFormatTest { - - private ExternalProcessInputFormat<ExternalProcessInputSplit> format; - - private final String neverEndingCommand = "cat /dev/urandom"; - private final String thousandRecordsCommand = "dd if=/dev/zero bs=8 count=1000"; - private final String failingCommand = "ls /I/do/not/exist"; - - @Before - public void prepare() { - format = new MyExternalProcessTestInputFormat(); - } - - @Test - public void testOpen() { - - if(OperatingSystem.isWindows()) { - return; - } - - Configuration config = new Configuration(); - ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, this.neverEndingCommand); - - boolean processDestroyed = false; - try { - format.configure(config); - format.open(split); - - String[] cmd = {"/bin/sh","-c","ps aux | grep -v grep | grep \"cat /dev/urandom\" | wc -l"}; - - byte[] wcOut = new byte[128]; - Process p = Runtime.getRuntime().exec(cmd); - p.getInputStream().read(wcOut); - int pCnt = Integer.parseInt(new String(wcOut).trim()); - Assert.assertTrue(pCnt > 0); - - format.close(); - } catch (IOException e) { - Assert.fail(); - } catch (RuntimeException e) { - if(e.getMessage().equals("External process was destroyed although stream was not fully read.")) { - processDestroyed = true; - } - } finally { - Assert.assertTrue(processDestroyed); - } - } - - @Test - public void testCheckExitCode() { - - if(OperatingSystem.isWindows()) { - return; - } - - Configuration config = new Configuration(); - ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, failingCommand); - - format.configure(config); - boolean invalidExitCode = false; - try { - format.open(split); - format.waitForProcessToFinish(); - format.close(); - } catch (IOException e) { - Assert.fail(); - } catch (InterruptedException e) { - Assert.fail(); - } catch (RuntimeException e) { - if(e.getMessage().startsWith("External process did not finish with an allowed exit code:")) { - invalidExitCode = true; - } - } - Assert.assertTrue(invalidExitCode); - - invalidExitCode = false; - config.setString(ExternalProcessInputFormat.ALLOWEDEXITCODES_PARAMETER_KEY,"0,1,2"); - format.configure(config); - try { - format.open(split); - format.waitForProcessToFinish(); - format.close(); - } catch (IOException e) { - Assert.fail(); - } catch (InterruptedException e) { - Assert.fail(); - } catch (RuntimeException e) { - if(e.getMessage().startsWith("External process did not finish with an allowed exit code:")) { - invalidExitCode = true; - } - } - Assert.assertTrue(!invalidExitCode); - - } - - @Test - public void testUserCodeTermination() { - - if(OperatingSystem.isWindows()) { - return; - } - - Configuration config = new Configuration(); - config.setInteger(MyExternalProcessTestInputFormat.FAILCOUNT_PARAMETER_KEY, 100); - ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, this.neverEndingCommand); - Record record = new Record(); - - boolean userException = false; - boolean processDestroyed = false; - try { - format.configure(config); - format.open(split); - while(!format.reachedEnd()) { - try { - format.nextRecord(record); - } catch(RuntimeException re) { - userException = true; - break; - } - } - format.close(); - } catch (IOException e) { - Assert.fail(); - } catch (RuntimeException e) { - if(e.getMessage().equals("External process was destroyed although stream was not fully read.")) { - processDestroyed = true; - } - } finally { - Assert.assertTrue(userException && processDestroyed); - } - } - - @Test - public void testReadStream() { - - if(OperatingSystem.isWindows()) { - return; - } - - Configuration config = new Configuration(); - ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, this.thousandRecordsCommand); - Record record = new Record(); - - int cnt = 0; - try { - format.configure(config); - format.open(split); - while(!format.reachedEnd()) { - if (format.nextRecord(record) != null) { - cnt++; - } - } - format.close(); - } catch (IOException e) { - Assert.fail(); - } catch (RuntimeException e) { - Assert.fail(e.getMessage()); - } - Assert.assertTrue("Expected read count was 1000, actual read count was "+cnt, cnt == 1000); - } - - private final class MyExternalProcessTestInputFormat extends ExternalProcessInputFormat<ExternalProcessInputSplit> { - private static final long serialVersionUID = 1L; - - public static final String FAILCOUNT_PARAMETER_KEY = "test.failingCount"; - - private byte[] buf = new byte[8]; - - private long cnt = 0; - private int failCnt; - private boolean end; - - @Override - public void configure(Configuration parameters) { - super.configure(parameters); - failCnt = parameters.getInteger(FAILCOUNT_PARAMETER_KEY, Integer.MAX_VALUE); - } - - @Override - public void open(GenericInputSplit split) throws IOException { - super.open(split); - - this.end = false; - } - - @Override - public ExternalProcessInputSplit[] createInputSplits(int minNumSplits) { - return null; - } - - @Override - public DefaultInputSplitAssigner getInputSplitAssigner(GenericInputSplit[] splits) { - return new DefaultInputSplitAssigner(splits); - } - - @Override - public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { - return null; - } - - @Override - public Record nextRecord(Record reuse) throws IOException { - - if(cnt > failCnt) { - throw new RuntimeException("This is a test exception!"); - } - - int totalReadCnt = 0; - - do { - int readCnt = super.extProcOutStream.read(buf, totalReadCnt, buf.length-totalReadCnt); - - if(readCnt == -1) { - this.end = true; - return null; - } else { - totalReadCnt += readCnt; - } - - } while(totalReadCnt != 8); - - int v1 = 0; - v1 = v1 | (0xFF & buf[0]); - v1 = (v1 << 8) | (0xFF & buf[1]); - v1 = (v1 << 8) | (0xFF & buf[2]); - v1 = (v1 << 8) | (0xFF & buf[3]); - - int v2 = 0; - v2 = v2 | (0xFF & buf[4]); - v2 = (v2 << 8) | (0xFF & buf[5]); - v2 = (v2 << 8) | (0xFF & buf[6]); - v2 = (v2 << 8) | (0xFF & buf[7]); - - reuse.setField(0,new IntValue(v1)); - reuse.setField(1,new IntValue(v2)); - - this.cnt++; - - return reuse; - } - - @Override - public boolean reachedEnd() throws IOException { - return this.end; - } - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/io/FixedLenghtInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/FixedLenghtInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/FixedLenghtInputFormatTest.java deleted file mode 100644 index 3d441e6..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/FixedLenghtInputFormatTest.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.record.io; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class FixedLenghtInputFormatTest { - - protected Configuration config; - - protected File tempFile; - - private final FixedLengthInputFormat format = new MyFixedLengthInputFormat(); - - // -------------------------------------------------------------------------------------------- - - @Before - public void setup() { - format.setFilePath("file:///some/file/that/will/not/be/read"); - } - - @After - public void setdown() throws Exception { - if (this.format != null) { - this.format.close(); - } - if (this.tempFile != null) { - this.tempFile.delete(); - } - } - - @Test - public void testOpen() throws IOException { - final int[] fileContent = {1,2,3,4,5,6,7,8}; - final FileInputSplit split = createTempFile(fileContent); - - final Configuration parameters = new Configuration(); - parameters.setInteger(FixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8); - - format.configure(parameters); - format.open(split); - assertEquals(0, format.getSplitStart()); - assertEquals(0, format.getReadBufferSize() % 8); - format.close(); - - parameters.setInteger(FixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 13); - format.configure(parameters); - format.close(); - format.open(split); - assertEquals(0, format.getReadBufferSize() % 13); - format.close(); - - parameters.setInteger(FixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 27); - format.configure(parameters); - format.close(); - format.open(split); - assertEquals(0, format.getReadBufferSize() % 27); - format.close(); - - } - - @Test - public void testRead() throws IOException { - final int[] fileContent = {1,2,3,4,5,6,7,8}; - final FileInputSplit split = createTempFile(fileContent); - - final Configuration parameters = new Configuration(); - - parameters.setInteger(FixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8); - - format.configure(parameters); - format.open(split); - - Record record = new Record(); - - assertNotNull(format.nextRecord(record)); - assertEquals(1, record.getField(0, IntValue.class).getValue()); - assertEquals(2, record.getField(1, IntValue.class).getValue()); - - assertNotNull(format.nextRecord(record)); - assertEquals(3, record.getField(0, IntValue.class).getValue()); - assertEquals(4, record.getField(1, IntValue.class).getValue()); - - assertNotNull(format.nextRecord(record)); - assertEquals(5, record.getField(0, IntValue.class).getValue()); - assertEquals(6, record.getField(1, IntValue.class).getValue()); - - assertNotNull(format.nextRecord(record)); - assertEquals(7, record.getField(0, IntValue.class).getValue()); - assertEquals(8, record.getField(1, IntValue.class).getValue()); - - assertNull(format.nextRecord(record)); - assertTrue(format.reachedEnd()); - } - - - @Test - public void testReadFail() throws IOException { - final int[] fileContent = {1,2,3,4,5,6,7,8,9}; - final FileInputSplit split = createTempFile(fileContent); - - final Configuration parameters = new Configuration(); - parameters.setInteger(FixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8); - - format.configure(parameters); - format.open(split); - - Record record = new Record(); - - try { - assertNotNull(format.nextRecord(record)); - assertEquals(1, record.getField(0, IntValue.class).getValue()); - assertEquals(2, record.getField(1, IntValue.class).getValue()); - - assertNotNull(format.nextRecord(record)); - assertEquals(3, record.getField(0, IntValue.class).getValue()); - assertEquals(4, record.getField(1, IntValue.class).getValue()); - - assertNotNull(format.nextRecord(record)); - assertEquals(5, record.getField(0, IntValue.class).getValue()); - assertEquals(6, record.getField(1, IntValue.class).getValue()); - - assertNotNull(format.nextRecord(record)); - assertEquals(7, record.getField(0, IntValue.class).getValue()); - assertEquals(8, record.getField(1, IntValue.class).getValue()); - - assertNull(format.nextRecord(record)); - } catch(IOException ioe) { - assertTrue(ioe.getMessage().equals("Unable to read full record")); - } - } - - - private FileInputSplit createTempFile(int[] contents) throws IOException { - this.tempFile = File.createTempFile("test_contents", "tmp"); - this.tempFile.deleteOnExit(); - - DataOutputStream dos = new DataOutputStream(new FileOutputStream(tempFile)); - - for(int i : contents) { - dos.writeInt(i); - } - - dos.close(); - - return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"}); - } - - - private final class MyFixedLengthInputFormat extends FixedLengthInputFormat { - private static final long serialVersionUID = 1L; - - IntValue p1 = new IntValue(); - IntValue p2 = new IntValue(); - - @Override - public boolean readBytes(Record target, byte[] buffer, int startPos) { - int v1 = 0; - v1 = (v1 | buffer[startPos+0]) << 8; - v1 = (v1 | buffer[startPos+1]) << 8; - v1 = (v1 | buffer[startPos+2]) << 8; - v1 = (v1 | buffer[startPos+3]); - p1.setValue(v1); - - int v2 = 0; - v2 = (v2 | buffer[startPos+4]) << 8; - v2 = (v2 | buffer[startPos+5]) << 8; - v2 = (v2 | buffer[startPos+6]) << 8; - v2 = (v2 | buffer[startPos+7]); - p2.setValue(v2); - - target.setField(0, p1); - target.setField(1, p2); - - return true; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/io/TextInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/TextInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/TextInputFormatTest.java deleted file mode 100644 index 8ca19cf..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/TextInputFormatTest.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.record.io; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.FileWriter; -import java.io.OutputStreamWriter; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.types.Record; -import org.apache.flink.types.StringValue; -import org.junit.Test; - -public class TextInputFormatTest { - /** - * The TextInputFormat seems to fail reading more than one record. I guess its - * an off by one error. - * - * The easiest workaround is to setParameter(TextInputFormat.CHARSET_NAME, "ASCII"); - */ - @Test - public void testPositionBug() { - final String FIRST = "First line"; - final String SECOND = "Second line"; - - try { - // create input file - File tempFile = File.createTempFile("TextInputFormatTest", "tmp"); - tempFile.deleteOnExit(); - tempFile.setWritable(true); - - FileWriter writer = new FileWriter(tempFile); - writer.append(FIRST).append('\n'); - writer.append(SECOND).append('\n'); - writer.close(); - - TextInputFormat inputFormat = new TextInputFormat(); - inputFormat.setFilePath(tempFile.toURI().toString()); - - Configuration parameters = new Configuration(); - inputFormat.configure(parameters); - - FileInputSplit[] splits = inputFormat.createInputSplits(1); - assertTrue("expected at least one input split", splits.length >= 1); - - inputFormat.open(splits[0]); - - Record r = new Record(); - assertNotNull("Expecting first record here", inputFormat.nextRecord(r)); - assertEquals(FIRST, r.getField(0, StringValue.class).getValue()); - - assertNotNull("Expecting second record here",inputFormat.nextRecord(r )); - assertEquals(SECOND, r.getField(0, StringValue.class).getValue()); - - assertNull("The input file is over", inputFormat.nextRecord(r)); - } - catch (Throwable t) { - System.err.println("test failed with exception: " + t.getMessage()); - t.printStackTrace(System.err); - fail("Test erroneous"); - } - } - - - /** - * This tests cases when line ends with \r\n and \n is used as delimiter, the last \r should be removed - */ - @Test - public void testRemovingTrailingCR() { - - testRemovingTrailingCR("\n","\n"); - testRemovingTrailingCR("\r\n","\n"); - - testRemovingTrailingCR("|","|"); - testRemovingTrailingCR("|","\n"); - - } - - private void testRemovingTrailingCR(String lineBreaker,String delimiter) { - File tempFile; - - String FIRST = "First line"; - String SECOND = "Second line"; - String CONTENT = FIRST + lineBreaker + SECOND + lineBreaker; - - try { - // create input file - tempFile = File.createTempFile("TextInputFormatTest", "tmp"); - tempFile.deleteOnExit(); - tempFile.setWritable(true); - - OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile)); - wrt.write(CONTENT); - wrt.close(); - - TextInputFormat inputFormat = new TextInputFormat(); - inputFormat.setFilePath(tempFile.toURI().toString()); - - Configuration parameters = new Configuration(); - inputFormat.configure(parameters); - - inputFormat.setDelimiter(delimiter); - - FileInputSplit[] splits = inputFormat.createInputSplits(1); - - inputFormat.open(splits[0]); - - Record r = new Record(); - if ( (delimiter.equals("\n") && (lineBreaker.equals("\n") || lineBreaker.equals("\r\n") ) ) - || (lineBreaker.equals(delimiter)) ){ - - assertNotNull("Expecting first record here", inputFormat.nextRecord(r)); - assertEquals(FIRST, r.getField(0, StringValue.class).getValue()); - - assertNotNull("Expecting second record here",inputFormat.nextRecord(r )); - assertEquals(SECOND, r.getField(0, StringValue.class).getValue()); - - assertNull("The input file is over", inputFormat.nextRecord(r)); - } else { - assertNotNull("Expecting first record here", inputFormat.nextRecord(r)); - assertEquals(CONTENT, r.getField(0, StringValue.class).getValue()); - } - - - } - catch (Throwable t) { - System.err.println("test failed with exception: " + t.getMessage()); - t.printStackTrace(System.err); - fail("Test erroneous"); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java index 5ff9eaf..78d61d1 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java @@ -169,8 +169,7 @@ public abstract class CostEstimator { switch (n.getDriverStrategy()) { case NONE: case UNARY_NO_OP: - case BINARY_NO_OP: - case COLLECTOR_MAP: + case BINARY_NO_OP: case MAP: case MAP_PARTITION: case FLAT_MAP: http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java deleted file mode 100644 index 9c1bcd3..0000000 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.dag; - -import java.util.Collections; -import java.util.List; - -import org.apache.flink.api.common.operators.SingleInputOperator; -import org.apache.flink.optimizer.DataStatistics; -import org.apache.flink.optimizer.operators.CollectorMapDescriptor; -import org.apache.flink.optimizer.operators.OperatorDescriptorSingle; - -/** - * The optimizer's internal representation of a <i>Map</i> operator node. - */ -public class CollectorMapNode extends SingleInputNode { - - private final List<OperatorDescriptorSingle> possibleProperties; - - - public CollectorMapNode(SingleInputOperator<?, ?, ?> operator) { - super(operator); - - this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new CollectorMapDescriptor()); - } - - @Override - public String getOperatorName() { - return "Map"; - } - - @Override - protected List<OperatorDescriptorSingle> getPossibleProperties() { - return this.possibleProperties; - } - - /** - * Computes the estimates for the Map operator. Map takes one value and transforms it into another value. - * The cardinality consequently stays the same. - */ - @Override - protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { - this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java deleted file mode 100644 index bcd4d73..0000000 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.operators; - -import java.util.Collections; -import java.util.List; - -import org.apache.flink.optimizer.dag.SingleInputNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.PartitioningProperty; -import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.runtime.operators.DriverStrategy; - - -public class CollectorMapDescriptor extends OperatorDescriptorSingle { - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.COLLECTOR_MAP; - } - - @Override - public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { - return new SingleInputPlanNode(node, "Map ("+node.getOperator().getName()+")", in, DriverStrategy.COLLECTOR_MAP); - } - - @Override - protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { - RequestedGlobalProperties rgp = new RequestedGlobalProperties(); - rgp.setAnyDistribution(); - return Collections.singletonList(rgp); - } - - @Override - protected List<RequestedLocalProperties> createPossibleLocalProperties() { - return Collections.singletonList(new RequestedLocalProperties()); - } - - @Override - public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { - if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 && - gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) - { - gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList()); - } - gProps.clearUniqueFieldCombinations(); - return gProps; - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties lProps) { - return lProps.clearUniqueFieldSets(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java index aaabac5..fc5eb21 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java @@ -418,7 +418,6 @@ public class PlanJSONDumpGenerator { locString = "No-Op"; break; - case COLLECTOR_MAP: case MAP: locString = "Map"; break; http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java index d5ddf4d..1125c29 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java @@ -57,7 +57,6 @@ public class JsonMapper { case UNARY_NO_OP: return "No-Op"; - case COLLECTOR_MAP: case MAP: return "Map"; http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java index bcdee14..3f3eae1 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java @@ -47,7 +47,6 @@ import org.apache.flink.optimizer.dag.BulkIterationNode; import org.apache.flink.optimizer.dag.BulkPartialSolutionNode; import org.apache.flink.optimizer.dag.CoGroupNode; import org.apache.flink.optimizer.dag.CoGroupRawNode; -import org.apache.flink.optimizer.dag.CollectorMapNode; import org.apache.flink.optimizer.dag.CrossNode; import org.apache.flink.optimizer.dag.DagConnection; import org.apache.flink.optimizer.dag.DataSinkNode; @@ -144,9 +143,6 @@ public class GraphCreatingVisitor implements Visitor<Operator<?>> { else if (c instanceof MapPartitionOperatorBase) { n = new MapPartitionNode((MapPartitionOperatorBase<?, ?, ?>) c); } - else if (c instanceof org.apache.flink.api.common.operators.base.CollectorMapOperatorBase) { - n = new CollectorMapNode((org.apache.flink.api.common.operators.base.CollectorMapOperatorBase<?, ?, ?>) c); - } else if (c instanceof FlatMapOperatorBase) { n = new FlatMapNode((FlatMapOperatorBase<?, ?, ?>) c); } http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java deleted file mode 100644 index 60bc798..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.operators; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.GenericCollectorMap; -import org.apache.flink.util.Collector; -import org.apache.flink.util.MutableObjectIterator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Map task which is executed by a Task Manager. The task has a single - * input and one or multiple outputs. It is provided with a MapFunction - * implementation. - * <p> - * The MapTask creates an iterator over all key-value pairs of its input and hands that to the <code>map()</code> method - * of the MapFunction. - * - * @see GenericCollectorMap - * - * @param <IT> The mapper's input data type. - * @param <OT> The mapper's output data type. - */ -@SuppressWarnings("deprecation") -public class CollectorMapDriver<IT, OT> implements Driver<GenericCollectorMap<IT, OT>, OT> { - - private static final Logger LOG = LoggerFactory.getLogger(CollectorMapDriver.class); - - - private TaskContext<GenericCollectorMap<IT, OT>, OT> taskContext; - - private volatile boolean running; - - private boolean objectReuseEnabled = false; - - @Override - public void setup(TaskContext<GenericCollectorMap<IT, OT>, OT> context) { - this.taskContext = context; - this.running = true; - } - - @Override - public int getNumberOfInputs() { - return 1; - } - - @Override - public Class<GenericCollectorMap<IT, OT>> getStubType() { - @SuppressWarnings("unchecked") - final Class<GenericCollectorMap<IT, OT>> clazz = (Class<GenericCollectorMap<IT, OT>>) (Class<?>) GenericCollectorMap.class; - return clazz; - } - - @Override - public int getNumberOfDriverComparators() { - return 0; - } - - @Override - public void prepare() { - ExecutionConfig executionConfig = taskContext.getExecutionConfig(); - this.objectReuseEnabled = executionConfig.isObjectReuseEnabled(); - - if (LOG.isDebugEnabled()) { - LOG.debug("CollectorMapDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + "."); - } - } - - @Override - public void run() throws Exception { - // cache references on the stack - final MutableObjectIterator<IT> input = this.taskContext.getInput(0); - final GenericCollectorMap<IT, OT> stub = this.taskContext.getStub(); - final Collector<OT> output = this.taskContext.getOutputCollector(); - - if (objectReuseEnabled) { - IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance(); - - - while (this.running && ((record = input.next(record)) != null)) { - stub.map(record, output); - } - } else { - IT record; - while (this.running && ((record = input.next()) != null)) { - stub.map(record, output); - } - } - } - - @Override - public void cleanup() { - // mappers need no cleanup, since no strategies are used. - } - - @Override - public void cancel() { - this.running = false; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java index b069f12..12da126 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java @@ -23,7 +23,6 @@ import static org.apache.flink.runtime.operators.DamBehavior.MATERIALIZING; import static org.apache.flink.runtime.operators.DamBehavior.PIPELINED; import org.apache.flink.runtime.operators.chaining.ChainedAllReduceDriver; -import org.apache.flink.runtime.operators.chaining.ChainedCollectorMapDriver; import org.apache.flink.runtime.operators.chaining.ChainedDriver; import org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver; import org.apache.flink.runtime.operators.chaining.ChainedMapDriver; @@ -40,8 +39,6 @@ public enum DriverStrategy { // a binary no-op operator. non implementation available BINARY_NO_OP(null, null, PIPELINED, PIPELINED, 0), - // the old mapper - COLLECTOR_MAP(CollectorMapDriver.class, ChainedCollectorMapDriver.class, PIPELINED, 0), // the proper mapper MAP(MapDriver.class, ChainedMapDriver.class, PIPELINED, 0), http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java deleted file mode 100644 index 8900ed7..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.operators.chaining; - -import org.apache.flink.api.common.functions.RichFunction; -import org.apache.flink.api.common.functions.GenericCollectorMap; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.operators.BatchTask; - -@SuppressWarnings("deprecation") -public class ChainedCollectorMapDriver<IT, OT> extends ChainedDriver<IT, OT> { - - private GenericCollectorMap<IT, OT> mapper; - - // -------------------------------------------------------------------------------------------- - - @Override - public void setup(AbstractInvokable parent) { - @SuppressWarnings("unchecked") - final GenericCollectorMap<IT, OT> mapper = - BatchTask.instantiateUserCode(this.config, userCodeClassLoader, GenericCollectorMap.class); - this.mapper = mapper; - mapper.setRuntimeContext(getUdfRuntimeContext()); - } - - @Override - public void openTask() throws Exception { - Configuration stubConfig = this.config.getStubParameters(); - BatchTask.openUserCode(this.mapper, stubConfig); - } - - @Override - public void closeTask() throws Exception { - BatchTask.closeUserCode(this.mapper); - } - - @Override - public void cancelTask() { - try { - this.mapper.close(); - } catch (Throwable t) { - } - } - - // -------------------------------------------------------------------------------------------- - - public RichFunction getStub() { - return this.mapper; - } - - public String getTaskName() { - return this.taskName; - } - - // -------------------------------------------------------------------------------------------- - - @Override - public void collect(IT record) { - try { - this.mapper.map(record, this.outputCollector); - } catch (Exception ex) { - throw new ExceptionInChainedStubException(this.taskName, ex); - } - } - - @Override - public void close() { - this.outputCollector.close(); - } -}