http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf-common/src/main/java/org/apache/jena/hadoop/rdf/types/comparators/SimpleBinaryComparator.java ---------------------------------------------------------------------- diff --git a/hadoop-rdf-common/src/main/java/org/apache/jena/hadoop/rdf/types/comparators/SimpleBinaryComparator.java b/hadoop-rdf-common/src/main/java/org/apache/jena/hadoop/rdf/types/comparators/SimpleBinaryComparator.java deleted file mode 100644 index 6c46714..0000000 --- a/hadoop-rdf-common/src/main/java/org/apache/jena/hadoop/rdf/types/comparators/SimpleBinaryComparator.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.hadoop.rdf.types.comparators; - -import org.apache.hadoop.io.WritableComparator; - -/** - * A general purpose comparator that may be used with any types which can be - * compared directly on their binary encodings - */ -public class SimpleBinaryComparator extends WritableComparator { - - @Override - public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { - return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2); - } - -}
http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf-common/src/main/java/org/apache/jena/hadoop/rdf/types/converters/ThriftConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-rdf-common/src/main/java/org/apache/jena/hadoop/rdf/types/converters/ThriftConverter.java b/hadoop-rdf-common/src/main/java/org/apache/jena/hadoop/rdf/types/converters/ThriftConverter.java deleted file mode 100644 index 4d0ae92..0000000 --- a/hadoop-rdf-common/src/main/java/org/apache/jena/hadoop/rdf/types/converters/ThriftConverter.java +++ /dev/null @@ -1,124 +0,0 @@ -package org.apache.jena.hadoop.rdf.types.converters; - -import java.io.ByteArrayOutputStream; - -import org.apache.jena.riot.thrift.wire.RDF_Quad; -import org.apache.jena.riot.thrift.wire.RDF_Term; -import org.apache.jena.riot.thrift.wire.RDF_Triple; -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TCompactProtocol; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TIOStreamTransport; -import org.apache.thrift.transport.TMemoryInputTransport; -import org.apache.thrift.transport.TTransport; - -public class ThriftConverter { - - private static ThreadLocal<TMemoryInputTransport> inputTransports = new ThreadLocal<>(); - private static ThreadLocal<TProtocol> inputProtocols = new ThreadLocal<>(); - - private static ThreadLocal<ByteArrayOutputStream> outputStreams = new ThreadLocal<>(); - private static ThreadLocal<TTransport> outputTransports = new ThreadLocal<>(); - private static ThreadLocal<TProtocol> outputProtocols = new ThreadLocal<>(); - - private static TMemoryInputTransport getInputTransport() { - TMemoryInputTransport transport = inputTransports.get(); - if (transport != null) - return transport; - - transport = new TMemoryInputTransport(); - inputTransports.set(transport); - return transport; - } - - private static TProtocol getInputProtocol() { - TProtocol protocol = inputProtocols.get(); - if (protocol != null) - return protocol; - - protocol = new TCompactProtocol(getInputTransport()); - inputProtocols.set(protocol); - return protocol; - } - - private static ByteArrayOutputStream getOutputStream() { - ByteArrayOutputStream output = outputStreams.get(); - if (output != null) - return output; - - output = new ByteArrayOutputStream(); - outputStreams.set(output); - return output; - } - - private static TTransport getOutputTransport() { - TTransport transport = outputTransports.get(); - if (transport != null) - return transport; - - transport = new TIOStreamTransport(getOutputStream()); - outputTransports.set(transport); - return transport; - } - - private static TProtocol getOutputProtocol() { - TProtocol protocol = outputProtocols.get(); - if (protocol != null) - return protocol; - - protocol = new TCompactProtocol(getOutputTransport()); - outputProtocols.set(protocol); - return protocol; - } - - public static byte[] toBytes(RDF_Term term) throws TException { - ByteArrayOutputStream output = getOutputStream(); - output.reset(); - - TProtocol protocol = getOutputProtocol(); - term.write(protocol); - - return output.toByteArray(); - } - - public static void fromBytes(byte[] bs, RDF_Term term) throws TException { - TMemoryInputTransport transport = getInputTransport(); - transport.reset(bs); - TProtocol protocol = getInputProtocol(); - term.read(protocol); - } - - public static void fromBytes(byte[] buffer, RDF_Triple triple) throws TException { - TMemoryInputTransport transport = getInputTransport(); - transport.reset(buffer); - TProtocol protocol = getInputProtocol(); - triple.read(protocol); - } - - public static byte[] toBytes(RDF_Triple triple) throws TException { - ByteArrayOutputStream output = getOutputStream(); - output.reset(); - - TProtocol protocol = getOutputProtocol(); - triple.write(protocol); - - return output.toByteArray(); - } - - public static void fromBytes(byte[] buffer, RDF_Quad quad) throws TException { - TMemoryInputTransport transport = getInputTransport(); - transport.reset(buffer); - TProtocol protocol = getInputProtocol(); - quad.read(protocol); - } - - public static byte[] toBytes(RDF_Quad quad) throws TException { - ByteArrayOutputStream output = getOutputStream(); - output.reset(); - - TProtocol protocol = getOutputProtocol(); - quad.write(protocol); - - return output.toByteArray(); - } -} http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/CharacteristicTests.java ---------------------------------------------------------------------- diff --git a/hadoop-rdf-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/CharacteristicTests.java b/hadoop-rdf-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/CharacteristicTests.java deleted file mode 100644 index 7214b14..0000000 --- a/hadoop-rdf-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/CharacteristicTests.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.hadoop.rdf.io.types; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.Iterator; - -import org.apache.jena.hadoop.rdf.types.CharacteristicSetWritable; -import org.apache.jena.hadoop.rdf.types.CharacteristicWritable; -import org.junit.Assert; -import org.junit.Test; - -import com.hp.hpl.jena.graph.Node; -import com.hp.hpl.jena.graph.NodeFactory; - -/** - * Tests for {@link CharacteristicWritable} and - * {@link CharacteristicSetWritable} - * - * - * - */ -public class CharacteristicTests { - - /** - * Checks whether a writable round trips successfully - * - * @param cw - * Characteristic writable - * @throws IOException - */ - private void checkRoundTrip(CharacteristicWritable cw) throws IOException { - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - DataOutputStream output = new DataOutputStream(outputStream); - cw.write(output); - - ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); - DataInputStream input = new DataInputStream(inputStream); - CharacteristicWritable actual = CharacteristicWritable.read(input); - Assert.assertEquals(cw, actual); - } - - /** - * Tests characteristic round tripping - * - * @throws IOException - */ - @Test - public void characteristic_writable_01() throws IOException { - Node n = NodeFactory.createURI("http://example.org"); - CharacteristicWritable expected = new CharacteristicWritable(n); - Assert.assertEquals(1, expected.getCount().get()); - - this.checkRoundTrip(expected); - } - - /** - * Tests characteristic properties - * - * @throws IOException - */ - @Test - public void characteristic_writable_02() throws IOException { - Node n = NodeFactory.createURI("http://example.org"); - CharacteristicWritable cw1 = new CharacteristicWritable(n); - CharacteristicWritable cw2 = new CharacteristicWritable(n, 100); - this.checkRoundTrip(cw1); - this.checkRoundTrip(cw2); - - // Should still be equal since equality is only on the node not the - // count - Assert.assertEquals(cw1, cw2); - } - - /** - * Tests characteristic properties - * - * @throws IOException - */ - @Test - public void characteristic_writable_03() throws IOException { - CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org")); - CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org/other")); - this.checkRoundTrip(cw1); - this.checkRoundTrip(cw2); - - // Should not be equal as different nodes - Assert.assertNotEquals(cw1, cw2); - } - - /** - * Checks that a writable round trips - * - * @param set - * Characteristic set - * @throws IOException - */ - private void checkRoundTrip(CharacteristicSetWritable set) throws IOException { - // Test round trip - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - DataOutputStream output = new DataOutputStream(outputStream); - set.write(output); - - ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); - DataInputStream input = new DataInputStream(inputStream); - CharacteristicSetWritable actual = CharacteristicSetWritable.read(input); - Assert.assertEquals(set, actual); - } - - /** - * Checks a characteristic set - * - * @param set - * Set - * @param expectedItems - * Expected number of characteristics - * @param expectedCounts - * Expected counts for characteristics - */ - protected final void checkCharacteristicSet(CharacteristicSetWritable set, int expectedItems, long[] expectedCounts) { - Assert.assertEquals(expectedItems, set.size()); - Assert.assertEquals(expectedItems, expectedCounts.length); - Iterator<CharacteristicWritable> iter = set.getCharacteristics(); - int i = 0; - while (iter.hasNext()) { - CharacteristicWritable cw = iter.next(); - Assert.assertEquals(expectedCounts[i], cw.getCount().get()); - i++; - } - } - - /** - * Tests characteristic sets - * - * @throws IOException - */ - @Test - public void characteristic_set_writable_01() throws IOException { - CharacteristicSetWritable set = new CharacteristicSetWritable(); - - // Add some characteristics - CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org")); - CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org/other")); - set.add(cw1); - set.add(cw2); - this.checkCharacteristicSet(set, 2, new long[] { 1, 1 }); - this.checkRoundTrip(set); - } - - /** - * Tests characteristic sets - * - * @throws IOException - */ - @Test - public void characteristic_set_writable_02() throws IOException { - CharacteristicSetWritable set = new CharacteristicSetWritable(); - - // Add some characteristics - CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org")); - CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org"), 2); - set.add(cw1); - set.add(cw2); - this.checkCharacteristicSet(set, 1, new long[] { 3 }); - this.checkRoundTrip(set); - } - - /** - * Tests characteristic sets - * - * @throws IOException - */ - @Test - public void characteristic_set_writable_03() throws IOException { - CharacteristicSetWritable set1 = new CharacteristicSetWritable(); - CharacteristicSetWritable set2 = new CharacteristicSetWritable(); - - // Add some characteristics - CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org")); - CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org/other")); - set1.add(cw1); - set2.add(cw2); - this.checkCharacteristicSet(set1, 1, new long[] { 1 }); - this.checkCharacteristicSet(set2, 1, new long[] { 1 }); - this.checkRoundTrip(set1); - this.checkRoundTrip(set2); - - Assert.assertNotEquals(set1, set2); - } -} http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/RdfTypesTest.java ---------------------------------------------------------------------- diff --git a/hadoop-rdf-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/RdfTypesTest.java b/hadoop-rdf-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/RdfTypesTest.java deleted file mode 100644 index a70dfb0..0000000 --- a/hadoop-rdf-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/RdfTypesTest.java +++ /dev/null @@ -1,406 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.hadoop.rdf.io.types; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; - -import org.apache.hadoop.io.WritableComparable; -import org.apache.jena.atlas.lib.Tuple; -import org.apache.jena.hadoop.rdf.types.NodeTupleWritable; -import org.apache.jena.hadoop.rdf.types.NodeWritable; -import org.apache.jena.hadoop.rdf.types.QuadWritable; -import org.apache.jena.hadoop.rdf.types.TripleWritable; -import org.junit.Assert; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.hp.hpl.jena.datatypes.xsd.XSDDatatype; -import com.hp.hpl.jena.graph.Node; -import com.hp.hpl.jena.graph.NodeFactory; -import com.hp.hpl.jena.graph.Triple; -import com.hp.hpl.jena.sparql.core.Quad; - -/** - * Tests for the various RDF types defined by the - * {@link org.apache.jena.hadoop.rdf.types} package - * - * - * - */ -public class RdfTypesTest { - - private static final Logger LOG = LoggerFactory.getLogger(RdfTypesTest.class); - - private ByteArrayOutputStream outputStream; - private ByteArrayInputStream inputStream; - - /** - * Prepare for output - * - * @return Data output - */ - private DataOutput prepareOutput() { - this.outputStream = new ByteArrayOutputStream(); - return new DataOutputStream(this.outputStream); - } - - /** - * Prepare for input from the previously written output - * - * @return Data Input - */ - private DataInput prepareInput() { - this.inputStream = new ByteArrayInputStream(this.outputStream.toByteArray()); - return new DataInputStream(this.inputStream); - } - - /** - * Prepare for input from the given data - * - * @param data - * Data - * @return Data Input - */ - @SuppressWarnings("unused") - private DataInput prepareInput(byte[] data) { - this.inputStream = new ByteArrayInputStream(data); - return new DataInputStream(this.inputStream); - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - private <T extends WritableComparable> void testWriteRead(T writable, T expected) throws IOException, InstantiationException, IllegalAccessException, - ClassNotFoundException { - // Write out data - DataOutput output = this.prepareOutput(); - writable.write(output); - - // Read back in data - DataInput input = this.prepareInput(); - T actual = (T) Class.forName(writable.getClass().getName()).newInstance(); - actual.readFields(input); - - LOG.info("Original = " + writable.toString()); - LOG.info("Round Tripped = " + actual.toString()); - - // Check equivalent - Assert.assertEquals(0, expected.compareTo(actual)); - } - - /** - * Basic node writable round tripping test - * - * @throws IOException - * @throws InstantiationException - * @throws IllegalAccessException - * @throws ClassNotFoundException - */ - @Test - public void node_writable_null() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { - Node n = null; - NodeWritable nw = new NodeWritable(n); - testWriteRead(nw, nw); - } - - /** - * Basic node writable round tripping test - * - * @throws IOException - * @throws InstantiationException - * @throws IllegalAccessException - * @throws ClassNotFoundException - */ - @Test - @Ignore - public void node_writable_variable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { - Node n = NodeFactory.createVariable("x"); - NodeWritable nw = new NodeWritable(n); - testWriteRead(nw, nw); - } - - /** - * Basic node writable round tripping test - * - * @throws IOException - * @throws InstantiationException - * @throws IllegalAccessException - * @throws ClassNotFoundException - */ - @Test - @Ignore - public void node_writable_variable_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { - Node n = NodeFactory.createVariable("really-log-variable-name-asddsfr4545egfdgdfgfdgdtgvdg-dfgfdgdfgdfgdfg4-dfvdfgdfgdfgfdgfdgdfgdfgfdg"); - NodeWritable nw = new NodeWritable(n); - testWriteRead(nw, nw); - } - - /** - * Basic node writable round tripping test - * - * @throws IOException - * @throws InstantiationException - * @throws IllegalAccessException - * @throws ClassNotFoundException - */ - @Test - public void node_writable_uri_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { - Node n = NodeFactory.createURI("http://example.org"); - NodeWritable nw = new NodeWritable(n); - testWriteRead(nw, nw); - } - - /** - * Basic node writable round tripping test - * - * @throws IOException - * @throws InstantiationException - * @throws IllegalAccessException - * @throws ClassNotFoundException - */ - @Test - public void node_writable_uri_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { - Node n = NodeFactory.createURI("http://user:passw...@example.org/some/path?key=value#id"); - NodeWritable nw = new NodeWritable(n); - testWriteRead(nw, nw); - } - - /** - * Basic node writable round tripping test - * - * @throws IOException - * @throws InstantiationException - * @throws IllegalAccessException - * @throws ClassNotFoundException - */ - @Test - public void node_writable_literal_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { - Node n = NodeFactory.createLiteral("simple"); - NodeWritable nw = new NodeWritable(n); - testWriteRead(nw, nw); - } - - /** - * Basic node writable round tripping test - * - * @throws IOException - * @throws InstantiationException - * @throws IllegalAccessException - * @throws ClassNotFoundException - */ - @Test - public void node_writable_literal_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { - Node n = NodeFactory.createLiteral("language", "en", null); - NodeWritable nw = new NodeWritable(n); - testWriteRead(nw, nw); - } - - /** - * Basic node writable round tripping test - * - * @throws IOException - * @throws InstantiationException - * @throws IllegalAccessException - * @throws ClassNotFoundException - */ - @Test - public void node_writable_literal_03() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { - Node n = NodeFactory.createLiteral("string", XSDDatatype.XSDstring); - NodeWritable nw = new NodeWritable(n); - testWriteRead(nw, nw); - } - - /** - * Basic node writable round tripping test - * - * @throws IOException - * @throws InstantiationException - * @throws IllegalAccessException - * @throws ClassNotFoundException - */ - @Test - public void node_writable_literal_04() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { - Node n = NodeFactory.createLiteral("1234", XSDDatatype.XSDinteger); - NodeWritable nw = new NodeWritable(n); - testWriteRead(nw, nw); - } - - /** - * Basic node writable round tripping test - * - * @throws IOException - * @throws InstantiationException - * @throws IllegalAccessException - * @throws ClassNotFoundException - */ - @Test - public void node_writable_literal_05() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { - Node n = NodeFactory.createLiteral("123.4", XSDDatatype.XSDdecimal); - NodeWritable nw = new NodeWritable(n); - testWriteRead(nw, nw); - } - - /** - * Basic node writable round tripping test - * - * @throws IOException - * @throws InstantiationException - * @throws IllegalAccessException - * @throws ClassNotFoundException - */ - @Test - public void node_writable_literal_06() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { - Node n = NodeFactory.createLiteral("12.3e4", XSDDatatype.XSDdouble); - NodeWritable nw = new NodeWritable(n); - testWriteRead(nw, nw); - } - - /** - * Basic node writable round tripping test - * - * @throws IOException - * @throws InstantiationException - * @throws IllegalAccessException - * @throws ClassNotFoundException - */ - @Test - public void node_writable_literal_07() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { - Node n = NodeFactory.createLiteral("true", XSDDatatype.XSDboolean); - NodeWritable nw = new NodeWritable(n); - testWriteRead(nw, nw); - } - - /** - * Basic node writable round tripping test - * - * @throws IOException - * @throws InstantiationException - * @throws IllegalAccessException - * @throws ClassNotFoundException - */ - @Test - public void node_writable_bnode_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { - Node n = NodeFactory.createAnon(); - NodeWritable nw = new NodeWritable(n); - testWriteRead(nw, nw); - } - - /** - * Basic node writable round tripping test - * - * @throws IOException - * @throws InstantiationException - * @throws IllegalAccessException - * @throws ClassNotFoundException - */ - @Test - public void node_writable_bnode_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { - Node n = NodeFactory.createAnon(); - NodeWritable nw = new NodeWritable(n); - testWriteRead(nw, nw); - NodeWritable nw2 = new NodeWritable(n); - testWriteRead(nw2, nw2); - - Assert.assertEquals(0, nw.compareTo(nw2)); - } - - /** - * Basic triple writable round tripping test - * - * @throws IOException - * @throws InstantiationException - * @throws IllegalAccessException - * @throws ClassNotFoundException - */ - @Test - public void triple_writable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { - Triple t = new Triple(NodeFactory.createURI("http://example"), NodeFactory.createURI("http://predicate"), NodeFactory.createLiteral("value")); - TripleWritable tw = new TripleWritable(t); - testWriteRead(tw, tw); - } - - /** - * Basic triple writable round tripping test - * - * @throws IOException - * @throws InstantiationException - * @throws IllegalAccessException - * @throws ClassNotFoundException - */ - @Test - public void triple_writable_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { - Triple t = new Triple(NodeFactory.createAnon(), NodeFactory.createURI("http://predicate"), NodeFactory.createLiteral("value")); - TripleWritable tw = new TripleWritable(t); - testWriteRead(tw, tw); - } - - /** - * Basic quad writable round tripping test - * - * @throws IOException - * @throws InstantiationException - * @throws IllegalAccessException - * @throws ClassNotFoundException - */ - @Test - public void quad_writable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { - Quad q = new Quad(Quad.defaultGraphNodeGenerated, NodeFactory.createURI("http://example"), NodeFactory.createURI("http://predicate"), - NodeFactory.createLiteral("value")); - QuadWritable qw = new QuadWritable(q); - testWriteRead(qw, qw); - } - - /** - * Basic quad writable round tripping test - * - * @throws IOException - * @throws InstantiationException - * @throws IllegalAccessException - * @throws ClassNotFoundException - */ - @Test - public void quad_writable_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { - Quad q = new Quad(Quad.defaultGraphNodeGenerated, NodeFactory.createAnon(), NodeFactory.createURI("http://predicate"), - NodeFactory.createLiteral("value")); - QuadWritable qw = new QuadWritable(q); - testWriteRead(qw, qw); - } - - /** - * Basic tuple writable round tripping test - * - * @throws IOException - * @throws InstantiationException - * @throws IllegalAccessException - * @throws ClassNotFoundException - */ - @Test - public void tuple_writable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { - Tuple<Node> t = Tuple.createTuple(NodeFactory.createURI("http://one"), NodeFactory.createURI("http://two"), NodeFactory.createLiteral("value"), - NodeFactory.createLiteral("foo"), NodeFactory.createURI("http://three")); - NodeTupleWritable tw = new NodeTupleWritable(t); - testWriteRead(tw, tw); - } -} http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf-io/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-rdf-io/pom.xml b/hadoop-rdf-io/pom.xml deleted file mode 100644 index f7dbed1..0000000 --- a/hadoop-rdf-io/pom.xml +++ /dev/null @@ -1,100 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.jena</groupId> - <artifactId>jena-hadoop-rdf</artifactId> - <version>0.9.0-SNAPSHOT</version> - </parent> - <artifactId>jena-hadoop-rdf-io</artifactId> - <name>Apache Jena - RDF Tools for Hadoop - I/O</name> - <description>RDF Input/Output formats library for Hadoop</description> - - <!-- Note that versions are managed by parent POMs --> - <dependencies> - <!-- Internal Project Dependencies --> - <dependency> - <groupId>org.apache.jena</groupId> - <artifactId>jena-hadoop-rdf-common</artifactId> - <version>${project.version}</version> - </dependency> - - <!-- Hadoop Dependencies --> - <!-- Note these will be provided on the Hadoop cluster hence the provided - scope --> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <scope>provided</scope> -<!-- <exclusions> - hadoop-common apparently has all sorts of dependency convergence - issues as of 2.4.0 - Exclude this since it transitively has an outdated jackson-core-asl - dependency - <exclusion> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-json</artifactId> - </exclusion> - Exclude these since they have outdated commons-logging dependencies - <exclusion> - <groupId>commons-httpclient</groupId> - <artifactId>commons-httpclient</artifactId> - </exclusion> - <exclusion> - <groupId>commons-el</groupId> - <artifactId>commons-el</artifactId> - </exclusion> - <exclusion> - <groupId>net.java.dev.jets3t</groupId> - <artifactId>jets3t</artifactId> - </exclusion> - Exclude these since it has an outdated dependency on servlet-api - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-runtime</artifactId> - </exclusion> - Exclude these since it has an outdated dependency on commons-codec - <exclusion> - <groupId>commons-codec</groupId> - <artifactId>commons-codec</artifactId> - </exclusion> - </exclusions> --> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-common</artifactId> - <scope>provided</scope> - </dependency> - - <!-- Jena dependencies --> - <dependency> - <groupId>org.apache.jena</groupId> - <artifactId>jena-arq</artifactId> - </dependency> - - <!-- Test Dependencies --> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - </dependencies> -</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/HadoopIOConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/HadoopIOConstants.java b/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/HadoopIOConstants.java deleted file mode 100644 index 5c1b41c..0000000 --- a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/HadoopIOConstants.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.hadoop.rdf.io; - -/** - * Hadoop IO related constants - * - * - * - */ -public class HadoopIOConstants { - - /** - * Private constructor prevents instantiation - */ - private HadoopIOConstants() { - } - - /** - * Map Reduce configuration setting for max line length - */ - public static final String MAX_LINE_LENGTH = "mapreduce.input.linerecordreader.line.maxlength"; - - /** - * Run ID - */ - public static final String RUN_ID = "runId"; - - /** - * Compression codecs to use - */ - public static final String IO_COMPRESSION_CODECS = "io.compression.codecs"; -} http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java b/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java deleted file mode 100644 index 3b062c2..0000000 --- a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.hadoop.rdf.io; - -import java.io.IOException; - -/** - * RDF IO related constants - * - * - * - */ -public class RdfIOConstants { - - /** - * Private constructor prevents instantiation - */ - private RdfIOConstants() { - } - - /** - * Configuration key used to set whether bad tuples are ignored. This is the - * default behaviour, when explicitly set to {@code false} bad tuples will - * result in {@link IOException} being thrown by the relevant record - * readers. - */ - public static final String INPUT_IGNORE_BAD_TUPLES = "rdf.io.input.ignore-bad-tuples"; - - /** - * Configuration key used to set the batch size used for RDF output formats - * that take a batched writing approach. Default value is given by the - * constant {@link #DEFAULT_OUTPUT_BATCH_SIZE}. - */ - public static final String OUTPUT_BATCH_SIZE = "rdf.io.output.batch-size"; - - /** - * Default batch size for batched output formats - */ - public static final long DEFAULT_OUTPUT_BATCH_SIZE = 10000; -} http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java b/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java deleted file mode 100644 index 1069494..0000000 --- a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.hadoop.rdf.io.input; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; - -/** - * Abstract line based input format that reuses the machinery from - * {@link NLineInputFormat} to calculate the splits - * - * - * - * @param <TKey> - * Key type - * @param <TValue> - * Value type - */ -public abstract class AbstractNLineFileInputFormat<TKey, TValue> extends FileInputFormat<TKey, TValue> { - - /** - * Logically splits the set of input files for the job, splits N lines of - * the input as one split. - * - * @see FileInputFormat#getSplits(JobContext) - */ - public final List<InputSplit> getSplits(JobContext job) throws IOException { - List<InputSplit> splits = new ArrayList<InputSplit>(); - int numLinesPerSplit = NLineInputFormat.getNumLinesPerSplit(job); - for (FileStatus status : listStatus(job)) { - splits.addAll(NLineInputFormat.getSplitsForFile(status, job.getConfiguration(), numLinesPerSplit)); - } - return splits; - } -} http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java b/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java deleted file mode 100644 index e561cdb..0000000 --- a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.hadoop.rdf.io.input; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; - -/** - * Abstract implementation of a while file input format where each file is a - * single split - * - * - * - * @param <TKey> - * Key type - * @param <TValue> - * Value type - */ -public abstract class AbstractWholeFileInputFormat<TKey, TValue> extends FileInputFormat<TKey, TValue> { - - @Override - protected final boolean isSplitable(JobContext context, Path filename) { - return false; - } -} http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/BlockedNQuadsInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/BlockedNQuadsInputFormat.java b/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/BlockedNQuadsInputFormat.java deleted file mode 100644 index 4c7c51b..0000000 --- a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/BlockedNQuadsInputFormat.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.hadoop.rdf.io.input; - -import java.io.IOException; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.jena.hadoop.rdf.io.input.readers.BlockedNQuadsReader; -import org.apache.jena.hadoop.rdf.types.QuadWritable; - - -/** - * NTriples input format where files are processed as blocks of lines rather - * than in a line based manner as with the {@link NQuadsInputFormat} or as - * whole files with the {@link WholeFileNQuadsInputFormat} - * <p> - * This provides a compromise between the higher parser setup of creating more - * parsers and the benefit of being able to split input files over multiple - * mappers. - * </p> - * - * - * - */ -public class BlockedNQuadsInputFormat extends AbstractNLineFileInputFormat<LongWritable, QuadWritable> { - - @Override - public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - return new BlockedNQuadsReader(); - } - -} http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/BlockedNTriplesInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/BlockedNTriplesInputFormat.java b/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/BlockedNTriplesInputFormat.java deleted file mode 100644 index edc3dfa..0000000 --- a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/BlockedNTriplesInputFormat.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.hadoop.rdf.io.input; - -import java.io.IOException; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.jena.hadoop.rdf.io.input.readers.BlockedNTriplesReader; -import org.apache.jena.hadoop.rdf.types.TripleWritable; - - -/** - * NTriples input format where files are processed as blocks of lines rather - * than in a line based manner as with the {@link NTriplesInputFormat} or as - * whole files with the {@link WholeFileNTriplesInputFormat} - * <p> - * This provides a compromise between the higher parser setup of creating more - * parsers and the benefit of being able to split input files over multiple - * mappers. - * </p> - * - * - * - */ -public class BlockedNTriplesInputFormat extends AbstractNLineFileInputFormat<LongWritable, TripleWritable> { - - @Override - public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - return new BlockedNTriplesReader(); - } - -} http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/NQuadsInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/NQuadsInputFormat.java b/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/NQuadsInputFormat.java deleted file mode 100644 index ef68e87..0000000 --- a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/NQuadsInputFormat.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.hadoop.rdf.io.input; - -import java.io.IOException; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.jena.hadoop.rdf.io.input.readers.NQuadsReader; -import org.apache.jena.hadoop.rdf.types.QuadWritable; - - -/** - * NQuads input format - * - * - * - */ -public class NQuadsInputFormat extends AbstractNLineFileInputFormat<LongWritable, QuadWritable> { - - @Override - public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) - throws IOException, InterruptedException { - return new NQuadsReader(); - } - -} http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/NTriplesInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/NTriplesInputFormat.java b/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/NTriplesInputFormat.java deleted file mode 100644 index a3fca0d..0000000 --- a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/NTriplesInputFormat.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.hadoop.rdf.io.input; - -import java.io.IOException; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.jena.hadoop.rdf.io.input.readers.NTriplesReader; -import org.apache.jena.hadoop.rdf.types.TripleWritable; - - -/** - * NTriples input format - * - * - * - */ -public class NTriplesInputFormat extends AbstractNLineFileInputFormat<LongWritable, TripleWritable> { - - @Override - public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext context) - throws IOException, InterruptedException { - return new NTriplesReader(); - } - -} http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/QuadsInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/QuadsInputFormat.java b/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/QuadsInputFormat.java deleted file mode 100644 index b8fdbd5..0000000 --- a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/QuadsInputFormat.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.hadoop.rdf.io.input; - -import java.io.IOException; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.jena.hadoop.rdf.io.input.readers.QuadsReader; -import org.apache.jena.hadoop.rdf.types.QuadWritable; - - -/** - * RDF input format that can handle any RDF quads format that ARQ supports - * selecting the format to use for each file based upon the file extension - * - * - * - */ -public class QuadsInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> { - - @Override - public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - return new QuadsReader(); - } - -} http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/RdfJsonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/RdfJsonInputFormat.java b/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/RdfJsonInputFormat.java deleted file mode 100644 index 2b0fa7b..0000000 --- a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/RdfJsonInputFormat.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.hadoop.rdf.io.input; - -import java.io.IOException; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.jena.hadoop.rdf.io.input.readers.RdfJsonReader; -import org.apache.jena.hadoop.rdf.types.TripleWritable; - - -/** - * RDF/JSON input format - * - * - * - */ -public class RdfJsonInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> { - - @Override - public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - return new RdfJsonReader(); - } - -} http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/RdfXmlInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/RdfXmlInputFormat.java b/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/RdfXmlInputFormat.java deleted file mode 100644 index 3f3c6c4..0000000 --- a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/RdfXmlInputFormat.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.hadoop.rdf.io.input; - -import java.io.IOException; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.jena.hadoop.rdf.io.input.readers.RdfXmlReader; -import org.apache.jena.hadoop.rdf.types.TripleWritable; - - -/** - * RDF/XML input format - * - * - * - */ -public class RdfXmlInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> { - - @Override - public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - return new RdfXmlReader(); - } - -} http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriGInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriGInputFormat.java b/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriGInputFormat.java deleted file mode 100644 index dd7a742..0000000 --- a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriGInputFormat.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.hadoop.rdf.io.input; - -import java.io.IOException; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.jena.hadoop.rdf.io.input.readers.TriGReader; -import org.apache.jena.hadoop.rdf.types.QuadWritable; - - -/** - * Input format for TriG - * - * - * - */ -public class TriGInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> { - - @Override - public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - return new TriGReader(); - } - -} http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesInputFormat.java b/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesInputFormat.java deleted file mode 100644 index 24d93ba..0000000 --- a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesInputFormat.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.hadoop.rdf.io.input; - -import java.io.IOException; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.jena.hadoop.rdf.io.input.readers.TriplesReader; -import org.apache.jena.hadoop.rdf.types.TripleWritable; - - -/** - * RDF input format that can handle any RDF triples format that ARQ supports - * selecting the format to use for each file based upon the file extension - * - * - * - */ -public class TriplesInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> { - - @Override - public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - return new TriplesReader(); - } - -} http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java b/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java deleted file mode 100644 index bfd643e..0000000 --- a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.hadoop.rdf.io.input; - -import java.io.IOException; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.jena.hadoop.rdf.io.input.readers.TriplesOrQuadsReader; -import org.apache.jena.hadoop.rdf.types.QuadWritable; - - -/** - * RDF input format that can handle any RDF triple/quads format that ARQ - * supports selecting the format to use for each file based upon the file - * extension. Triples are converted into quads in the default graph. - * - * - * - */ -public class TriplesOrQuadsInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> { - - @Override - public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - return new TriplesOrQuadsReader(); - } - -} http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TurtleInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TurtleInputFormat.java b/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TurtleInputFormat.java deleted file mode 100644 index 12ee90d..0000000 --- a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TurtleInputFormat.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.hadoop.rdf.io.input; - -import java.io.IOException; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.jena.hadoop.rdf.io.input.readers.TurtleReader; -import org.apache.jena.hadoop.rdf.types.TripleWritable; - - -/** - * Turtle input format - * - * - * - */ -public class TurtleInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> { - - @Override - public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - return new TurtleReader(); - } - -} http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/WholeFileNQuadsInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/WholeFileNQuadsInputFormat.java b/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/WholeFileNQuadsInputFormat.java deleted file mode 100644 index ad4436a..0000000 --- a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/WholeFileNQuadsInputFormat.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.hadoop.rdf.io.input; - -import java.io.IOException; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.jena.hadoop.rdf.io.input.readers.WholeFileNQuadsReader; -import org.apache.jena.hadoop.rdf.types.QuadWritable; - - -/** - * NQuads input format where files are processed as complete files rather than - * in a line based manner as with the {@link NQuadsInputFormat} - * <p> - * This has the advantage of less parser setup overhead but the disadvantage - * that the input cannot be split over multiple mappers. - * </p> - * - * - * - */ -public class WholeFileNQuadsInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> { - - @Override - public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - return new WholeFileNQuadsReader(); - } - -} http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/WholeFileNTriplesInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/WholeFileNTriplesInputFormat.java b/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/WholeFileNTriplesInputFormat.java deleted file mode 100644 index d866928..0000000 --- a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/WholeFileNTriplesInputFormat.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.hadoop.rdf.io.input; - -import java.io.IOException; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.jena.hadoop.rdf.io.input.readers.WholeFileNTriplesReader; -import org.apache.jena.hadoop.rdf.types.TripleWritable; - - -/** - * NTriples input format where files are processed as complete files rather than - * in a line based manner as with the {@link NTriplesInputFormat} - * <p> - * This has the advantage of less parser setup overhead but the disadvantage - * that the input cannot be split over multiple mappers. - * </p> - * - * - * - */ -public class WholeFileNTriplesInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> { - - @Override - public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - return new WholeFileNTriplesReader(); - } - -} http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java ---------------------------------------------------------------------- diff --git a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java b/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java deleted file mode 100644 index 4b01a7d..0000000 --- a/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java +++ /dev/null @@ -1,337 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.hadoop.rdf.io.input.readers; - -import java.io.IOException; -import java.io.InputStream; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.apache.jena.hadoop.rdf.io.RdfIOConstants; -import org.apache.jena.hadoop.rdf.io.input.util.BlockInputStream; -import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream; -import org.apache.jena.hadoop.rdf.io.input.util.TrackedInputStream; -import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream; -import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable; -import org.apache.jena.riot.Lang; -import org.apache.jena.riot.RDFDataMgr; -import org.apache.jena.riot.lang.PipedRDFIterator; -import org.apache.jena.riot.lang.PipedRDFStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * An abstract implementation for a record reader that reads records from blocks - * of files, this is a hybrid between {@link AbstractLineBasedNodeTupleReader} - * and {@link AbstractWholeFileNodeTupleReader} in that it can only be used by - * formats which can be split by lines but reduces the overhead by parsing the - * split as a whole rather than as individual lines. - * <p> - * The keys produced are the approximate position in the file at which a tuple - * was found and the values will be node tuples. Positions are approximate - * because they are recorded after the point at which the most recent tuple was - * parsed from the input thus they reflect the approximate position in the - * stream immediately after which the triple was found. - * </p> - * - * - * - * @param <TValue> - * Value type - * @param <T> - * Tuple type - */ -public abstract class AbstractBlockBasedNodeTupleReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends RecordReader<LongWritable, T> { - - private static final Logger LOG = LoggerFactory.getLogger(AbstractBlockBasedNodeTupleReader.class); - private CompressionCodec compressionCodecs; - private TrackableInputStream input; - private LongWritable key; - private long start, length; - private T tuple; - private TrackedPipedRDFStream<TValue> stream; - private PipedRDFIterator<TValue> iter; - private Thread parserThread; - private boolean finished = false; - private boolean ignoreBadTuples = true; - private boolean parserFinished = false; - private Throwable parserError = null; - - @Override - public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException { - LOG.debug("initialize({}, {})", genericSplit, context); - - // Assuming file split - if (!(genericSplit instanceof FileSplit)) - throw new IOException("This record reader only supports FileSplit inputs"); - FileSplit split = (FileSplit) genericSplit; - - // Configuration - Configuration config = context.getConfiguration(); - this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true); - if (this.ignoreBadTuples) - LOG.warn( - "Configured to ignore bad tuples, parsing errors will be logged and further parsing aborted but no user visible errors will be thrown. Consider setting {} to false to disable this behaviour", - RdfIOConstants.INPUT_IGNORE_BAD_TUPLES); - - // Figure out what portion of the file to read - start = split.getStart(); - long end = start + split.getLength(); - final Path file = split.getPath(); - long totalLength = file.getFileSystem(context.getConfiguration()).getFileStatus(file).getLen(); - boolean readToEnd = end == totalLength; - CompressionCodecFactory factory = new CompressionCodecFactory(config); - this.compressionCodecs = factory.getCodec(file); - - LOG.info(String.format("Got split with start %d and length %d for file with total length of %d", new Object[] { start, split.getLength(), totalLength })); - - // Open the file and prepare the input stream - FileSystem fs = file.getFileSystem(config); - FSDataInputStream fileIn = fs.open(file); - this.length = split.getLength(); - if (start > 0) - fileIn.seek(start); - - if (this.compressionCodecs != null) { - // Compressed input - // For compressed input NLineInputFormat will have failed to find - // any line breaks and will give us a split from 0 -> (length - 1) - // Add 1 and re-verify readToEnd so we can abort correctly if ever - // given a partial split of a compressed file - end++; - readToEnd = end == totalLength; - if (start > 0 || !readToEnd) - throw new IOException("This record reader can only be used with compressed input where the split is a whole file"); - input = new TrackedInputStream(this.compressionCodecs.createInputStream(fileIn)); - } else { - // Uncompressed input - - if (readToEnd) { - input = new TrackedInputStream(fileIn); - } else { - // Need to limit the portion of the file we are reading - input = new BlockInputStream(fileIn, split.getLength()); - } - } - - // Set up background thread for parser - iter = this.getPipedIterator(); - this.stream = this.getPipedStream(iter, this.input); - Runnable parserRunnable = this.createRunnable(this, this.input, stream, this.getRdfLanguage()); - this.parserThread = new Thread(parserRunnable); - this.parserThread.setDaemon(true); - this.parserThread.start(); - } - - /** - * Gets the RDF iterator to use - * - * @return Iterator - */ - protected abstract PipedRDFIterator<TValue> getPipedIterator(); - - /** - * Gets the RDF stream to parse to - * - * @param iterator - * Iterator - * @return RDF stream - */ - protected abstract TrackedPipedRDFStream<TValue> getPipedStream(PipedRDFIterator<TValue> iterator, TrackableInputStream input); - - /** - * Gets the RDF language to use for parsing - * - * @return - */ - protected abstract Lang getRdfLanguage(); - - /** - * Creates the runnable upon which the parsing will run - * - * @param input - * Input - * @param stream - * Stream - * @param lang - * Language to use for parsing - * @return Parser runnable - */ - private Runnable createRunnable(@SuppressWarnings("rawtypes") final AbstractBlockBasedNodeTupleReader reader, final InputStream input, - final PipedRDFStream<TValue> stream, final Lang lang) { - return new Runnable() { - @Override - public void run() { - try { - RDFDataMgr.parse(stream, input, null, lang); - reader.setParserFinished(null); - } catch (Throwable e) { - reader.setParserFinished(e); - } - } - }; - } - - /** - * Sets the parser thread finished state - * - * @param e - * Error (if any) - */ - private void setParserFinished(Throwable e) { - synchronized (this.parserThread) { - this.parserError = e; - this.parserFinished = true; - } - } - - /** - * Waits for the parser thread to have reported as finished - * - * @throws InterruptedException - */ - private void waitForParserFinished() throws InterruptedException { - do { - synchronized (this.parserThread) { - if (this.parserFinished) - return; - } - Thread.sleep(50); - } while (true); - } - - /** - * Creates an instance of a writable tuple from the given tuple value - * - * @param tuple - * Tuple value - * @return Writable tuple - */ - protected abstract T createInstance(TValue tuple); - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - // Reuse key for efficiency - if (key == null) { - key = new LongWritable(); - } - - if (this.finished) - return false; - - try { - if (this.iter.hasNext()) { - // Position will be relative to the start for the split we're - // processing - Long l = this.start + this.stream.getPosition(); - if (l != null) { - this.key.set(l); - // For compressed input the actual length from which we - // calculate progress is likely less than the actual - // uncompressed length so we need to increment the - // length as we go along - // We always add 1 more than the current length because we - // don't want to report 100% progress until we really have - // finished - if (this.compressionCodecs != null && l > this.length) - this.length = l + 1; - } - this.tuple = this.createInstance(this.iter.next()); - return true; - } else { - // Need to ensure that the parser thread has finished in order - // to determine whether we finished without error - this.waitForParserFinished(); - if (this.parserError != null) { - LOG.error("Error parsing block, aborting further parsing", this.parserError); - if (!this.ignoreBadTuples) - throw new IOException("Error parsing block at position " + (this.start + this.input.getBytesRead()) + ", aborting further parsing", - this.parserError); - } - - this.key = null; - this.tuple = null; - this.finished = true; - // This is necessary so that when compressed input is used we - // report 100% progress once we've reached the genuine end of - // the stream - if (this.compressionCodecs != null) - this.length--; - return false; - } - } catch (IOException e) { - throw e; - } catch (Throwable e) { - // Failed to read the tuple on this line - LOG.error("Error parsing block, aborting further parsing", e); - if (!this.ignoreBadTuples) { - this.iter.close(); - throw new IOException("Error parsing block at position " + (this.start + this.input.getBytesRead()) + ", aborting further parsing", e); - } - this.key = null; - this.tuple = null; - this.finished = true; - return false; - } - } - - @Override - public LongWritable getCurrentKey() throws IOException, InterruptedException { - return this.key; - } - - @Override - public T getCurrentValue() throws IOException, InterruptedException { - return this.tuple; - } - - @Override - public float getProgress() throws IOException, InterruptedException { - float progress = 0.0f; - if (this.key == null) { - // We've either not started or we've finished - progress = (this.finished ? 1.0f : 0.0f); - } else if (this.key.get() == Long.MIN_VALUE) { - // We don't have a position so we've either in-progress or finished - progress = (this.finished ? 1.0f : 0.5f); - } else { - // We're some way through the file - progress = (this.key.get() - this.start) / (float) this.length; - } - LOG.debug("getProgress() --> {}", progress); - return progress; - } - - @Override - public void close() throws IOException { - this.iter.close(); - this.input.close(); - this.finished = true; - } - -}