HIVE-13529: Move around some of the classes created during llap branch work
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7b9096a9 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7b9096a9 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7b9096a9 Branch: refs/heads/llap Commit: 7b9096a922f9706909ba0e52d8188d182a79612f Parents: fc7343d Author: Jason Dere <jd...@hortonworks.com> Authored: Fri Apr 15 16:45:32 2016 -0700 Committer: Jason Dere <jd...@hortonworks.com> Committed: Fri Apr 15 16:45:32 2016 -0700 ---------------------------------------------------------------------- itests/hive-unit/pom.xml | 5 + .../hadoop/hive/jdbc/TestLlapInputSplit.java | 100 ----- .../hive/llap/ext/TestLlapInputSplit.java | 100 +++++ .../apache/hive/jdbc/TestJdbcWithMiniLlap.java | 4 +- .../apache/hive/jdbc/LlapBaseInputFormat.java | 135 ------ .../src/java/org/apache/hive/jdbc/LlapDump.java | 164 -------- .../org/apache/hive/jdbc/LlapInputSplit.java | 73 ---- .../apache/hive/jdbc/LlapRowInputFormat.java | 34 -- llap-client/pom.xml | 32 ++ .../hadoop/hive/llap/LlapBaseRecordReader.java | 205 +++++++++ .../hadoop/hive/llap/LlapInputFormat.java | 392 ++++++++++++++++++ .../apache/hadoop/hive/llap/LlapInputSplit.java | 131 ++++++ .../hadoop/hive/llap/LlapRowRecordReader.java | 155 +++++++ .../apache/hadoop/hive/llap/SubmitWorkInfo.java | 103 +++++ .../ext/LlapTaskUmbilicalExternalClient.java | 415 +++++++++++++++++++ .../helpers/LlapTaskUmbilicalServer.java | 57 +++ .../hadoop/hive/llap/LlapRowRecordReader.java | 155 ------- llap-ext-client/pom.xml | 140 +++++++ .../hadoop/hive/llap/LlapBaseInputFormat.java | 136 ++++++ .../org/apache/hadoop/hive/llap/LlapDump.java | 165 ++++++++ .../hadoop/hive/llap/LlapRowInputFormat.java | 36 ++ .../apache/hive/llap/ext/LlapInputSplit.java | 73 ++++ .../hadoop/hive/llap/LlapInputFormat.java | 392 ------------------ .../ext/LlapTaskUmbilicalExternalClient.java | 415 ------------------- .../helpers/LlapTaskUmbilicalServer.java | 57 --- pom.xml | 1 + .../hadoop/hive/llap/LlapBaseRecordReader.java | 205 --------- .../apache/hadoop/hive/llap/LlapInputSplit.java | 131 ------ .../apache/hadoop/hive/llap/SubmitWorkInfo.java | 103 ----- 29 files changed, 2148 insertions(+), 1966 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/itests/hive-unit/pom.xml ---------------------------------------------------------------------- diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml index ae231de..b248673 100644 --- a/itests/hive-unit/pom.xml +++ b/itests/hive-unit/pom.xml @@ -61,6 +61,11 @@ </dependency> <dependency> <groupId>org.apache.hive</groupId> + <artifactId>hive-llap-ext-client</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> <artifactId>hive-llap-server</artifactId> <version>${project.version}</version> <type>test-jar</type> http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java deleted file mode 100644 index 366e326..0000000 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java +++ /dev/null @@ -1,100 +0,0 @@ -package org.apache.hive.jdbc; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.util.ArrayList; -import java.util.HashMap; - -import org.apache.hadoop.io.Text; - -import org.apache.hadoop.hive.llap.Schema; -import org.apache.hadoop.hive.llap.FieldDesc; -import org.apache.hadoop.hive.llap.TypeDesc; - -import org.apache.hadoop.mapred.SplitLocationInfo; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import static org.junit.Assert.*; - -public class TestLlapInputSplit { - - @Test - public void testWritable() throws Exception { - int splitNum = 88; - byte[] planBytes = "0123456789987654321".getBytes(); - byte[] fragmentBytes = "abcdefghijklmnopqrstuvwxyz".getBytes(); - SplitLocationInfo[] locations = { - new SplitLocationInfo("location1", false), - new SplitLocationInfo("location2", false), - }; - - ArrayList<FieldDesc> colDescs = new ArrayList<FieldDesc>(); - colDescs.add(new FieldDesc("col1", new TypeDesc(TypeDesc.Type.STRING))); - colDescs.add(new FieldDesc("col2", new TypeDesc(TypeDesc.Type.INT))); - Schema schema = new Schema(colDescs); - - org.apache.hadoop.hive.llap.LlapInputSplit split1 = new org.apache.hadoop.hive.llap.LlapInputSplit( - splitNum, - planBytes, - fragmentBytes, - locations, - schema, - "hive"); - ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream(); - DataOutputStream dataOut = new DataOutputStream(byteOutStream); - split1.write(dataOut); - ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray()); - DataInputStream dataIn = new DataInputStream(byteInStream); - org.apache.hadoop.hive.llap.LlapInputSplit split2 = new org.apache.hadoop.hive.llap.LlapInputSplit(); - split2.readFields(dataIn); - - // Did we read all the data? - assertEquals(0, byteInStream.available()); - - checkLlapSplits(split1, split2); - - // Try JDBC LlapInputSplits - org.apache.hive.jdbc.LlapInputSplit<Text> jdbcSplit1 = - new org.apache.hive.jdbc.LlapInputSplit<Text>(split1, "org.apache.hadoop.hive.llap.LlapInputFormat"); - byteOutStream.reset(); - jdbcSplit1.write(dataOut); - byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray()); - dataIn = new DataInputStream(byteInStream); - org.apache.hive.jdbc.LlapInputSplit<Text> jdbcSplit2 = new org.apache.hive.jdbc.LlapInputSplit<Text>(); - jdbcSplit2.readFields(dataIn); - - assertEquals(0, byteInStream.available()); - - checkLlapSplits( - (org.apache.hadoop.hive.llap.LlapInputSplit) jdbcSplit1.getSplit(), - (org.apache.hadoop.hive.llap.LlapInputSplit) jdbcSplit2.getSplit()); - assertEquals(jdbcSplit1.getInputFormat().getClass(), jdbcSplit2.getInputFormat().getClass()); - } - - static void checkLlapSplits( - org.apache.hadoop.hive.llap.LlapInputSplit split1, - org.apache.hadoop.hive.llap.LlapInputSplit split2) throws Exception { - - assertEquals(split1.getSplitNum(), split2.getSplitNum()); - assertArrayEquals(split1.getPlanBytes(), split2.getPlanBytes()); - assertArrayEquals(split1.getFragmentBytes(), split2.getFragmentBytes()); - SplitLocationInfo[] locationInfo1 = split1.getLocationInfo(); - SplitLocationInfo[] locationInfo2 = split2.getLocationInfo(); - for (int idx = 0; idx < locationInfo1.length; ++idx) { - assertEquals(locationInfo1[idx].getLocation(), locationInfo2[idx].getLocation()); - assertEquals(locationInfo1[idx].isInMemory(), locationInfo2[idx].isInMemory()); - assertEquals(locationInfo1[idx].isOnDisk(), locationInfo2[idx].isOnDisk()); - } - assertArrayEquals(split1.getLocations(), split2.getLocations()); - assertEquals(split1.getSchema().toString(), split2.getSchema().toString()); - assertEquals(split1.getLlapUser(), split2.getLlapUser()); - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java new file mode 100644 index 0000000..04da17e --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java @@ -0,0 +1,100 @@ +package org.apache.hadoop.hive.llap.ext; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.util.ArrayList; +import java.util.HashMap; + +import org.apache.hadoop.io.Text; + +import org.apache.hadoop.hive.llap.Schema; +import org.apache.hadoop.hive.llap.FieldDesc; +import org.apache.hadoop.hive.llap.TypeDesc; + +import org.apache.hadoop.mapred.SplitLocationInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import static org.junit.Assert.*; + +public class TestLlapInputSplit { + + @Test + public void testWritable() throws Exception { + int splitNum = 88; + byte[] planBytes = "0123456789987654321".getBytes(); + byte[] fragmentBytes = "abcdefghijklmnopqrstuvwxyz".getBytes(); + SplitLocationInfo[] locations = { + new SplitLocationInfo("location1", false), + new SplitLocationInfo("location2", false), + }; + + ArrayList<FieldDesc> colDescs = new ArrayList<FieldDesc>(); + colDescs.add(new FieldDesc("col1", new TypeDesc(TypeDesc.Type.STRING))); + colDescs.add(new FieldDesc("col2", new TypeDesc(TypeDesc.Type.INT))); + Schema schema = new Schema(colDescs); + + org.apache.hadoop.hive.llap.LlapInputSplit split1 = new org.apache.hadoop.hive.llap.LlapInputSplit( + splitNum, + planBytes, + fragmentBytes, + locations, + schema, + "hive"); + ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream(); + DataOutputStream dataOut = new DataOutputStream(byteOutStream); + split1.write(dataOut); + ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray()); + DataInputStream dataIn = new DataInputStream(byteInStream); + org.apache.hadoop.hive.llap.LlapInputSplit split2 = new org.apache.hadoop.hive.llap.LlapInputSplit(); + split2.readFields(dataIn); + + // Did we read all the data? + assertEquals(0, byteInStream.available()); + + checkLlapSplits(split1, split2); + + // Try JDBC LlapInputSplits + org.apache.hive.llap.ext.LlapInputSplit<Text> jdbcSplit1 = + new org.apache.hive.llap.ext.LlapInputSplit<Text>(split1, "org.apache.hadoop.hive.llap.LlapInputFormat"); + byteOutStream.reset(); + jdbcSplit1.write(dataOut); + byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray()); + dataIn = new DataInputStream(byteInStream); + org.apache.hive.llap.ext.LlapInputSplit<Text> jdbcSplit2 = new org.apache.hive.llap.ext.LlapInputSplit<Text>(); + jdbcSplit2.readFields(dataIn); + + assertEquals(0, byteInStream.available()); + + checkLlapSplits( + (org.apache.hadoop.hive.llap.LlapInputSplit) jdbcSplit1.getSplit(), + (org.apache.hadoop.hive.llap.LlapInputSplit) jdbcSplit2.getSplit()); + assertEquals(jdbcSplit1.getInputFormat().getClass(), jdbcSplit2.getInputFormat().getClass()); + } + + static void checkLlapSplits( + org.apache.hadoop.hive.llap.LlapInputSplit split1, + org.apache.hadoop.hive.llap.LlapInputSplit split2) throws Exception { + + assertEquals(split1.getSplitNum(), split2.getSplitNum()); + assertArrayEquals(split1.getPlanBytes(), split2.getPlanBytes()); + assertArrayEquals(split1.getFragmentBytes(), split2.getFragmentBytes()); + SplitLocationInfo[] locationInfo1 = split1.getLocationInfo(); + SplitLocationInfo[] locationInfo2 = split2.getLocationInfo(); + for (int idx = 0; idx < locationInfo1.length; ++idx) { + assertEquals(locationInfo1[idx].getLocation(), locationInfo2[idx].getLocation()); + assertEquals(locationInfo1[idx].isInMemory(), locationInfo2[idx].isInMemory()); + assertEquals(locationInfo1[idx].isOnDisk(), locationInfo2[idx].isOnDisk()); + } + assertArrayEquals(split1.getLocations(), split2.getLocations()); + assertEquals(split1.getSchema().toString(), split2.getSchema().toString()); + assertEquals(split1.getLlapUser(), split2.getLlapUser()); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java index deeac2e..5b4ba49 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java @@ -68,8 +68,8 @@ import org.apache.hadoop.io.Text; import org.apache.hive.jdbc.miniHS2.MiniHS2; import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; -import org.apache.hive.jdbc.LlapBaseInputFormat; -import org.apache.hive.jdbc.LlapRowInputFormat; +import org.apache.hadoop.hive.llap.LlapBaseInputFormat; +import org.apache.hadoop.hive.llap.LlapRowInputFormat; import org.datanucleus.ClassLoaderResolver; import org.datanucleus.NucleusContext; http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/jdbc/src/java/org/apache/hive/jdbc/LlapBaseInputFormat.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapBaseInputFormat.java b/jdbc/src/java/org/apache/hive/jdbc/LlapBaseInputFormat.java deleted file mode 100644 index a0ddeaa..0000000 --- a/jdbc/src/java/org/apache/hive/jdbc/LlapBaseInputFormat.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hive.jdbc; - -import java.util.ArrayList; -import java.util.List; - -import java.sql.SQLException; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.Statement; -import java.sql.DriverManager; - -import java.io.IOException; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.DataInputStream; -import java.io.ByteArrayInputStream; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.InputSplitWithLocationInfo; -import org.apache.hadoop.mapred.SplitLocationInfo; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.util.Progressable; - -import com.google.common.base.Preconditions; - -public class LlapBaseInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> { - - private static String driverName = "org.apache.hive.jdbc.HiveDriver"; - private String url; // "jdbc:hive2://localhost:10000/default" - private String user; // "hive", - private String pwd; // "" - private String query; - - public static final String URL_KEY = "llap.if.hs2.connection"; - public static final String QUERY_KEY = "llap.if.query"; - public static final String USER_KEY = "llap.if.user"; - public static final String PWD_KEY = "llap.if.pwd"; - - public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)"; - - private Connection con; - private Statement stmt; - - public LlapBaseInputFormat(String url, String user, String pwd, String query) { - this.url = url; - this.user = user; - this.pwd = pwd; - this.query = query; - } - - public LlapBaseInputFormat() {} - - - @Override - public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { - LlapInputSplit llapSplit = (LlapInputSplit) split; - return llapSplit.getInputFormat().getRecordReader(llapSplit.getSplit(), job, reporter); - } - - @Override - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - List<InputSplit> ins = new ArrayList<InputSplit>(); - - if (url == null) url = job.get(URL_KEY); - if (query == null) query = job.get(QUERY_KEY); - if (user == null) user = job.get(USER_KEY); - if (pwd == null) pwd = job.get(PWD_KEY); - - if (url == null || query == null) { - throw new IllegalStateException(); - } - - try { - Class.forName(driverName); - } catch (ClassNotFoundException e) { - throw new IOException(e); - } - - try { - con = DriverManager.getConnection(url,user,pwd); - stmt = con.createStatement(); - String sql = String.format(SPLIT_QUERY, query, numSplits); - ResultSet res = stmt.executeQuery(sql); - while (res.next()) { - // deserialize split - DataInput in = new DataInputStream(res.getBinaryStream(3)); - InputSplitWithLocationInfo is = (InputSplitWithLocationInfo)Class.forName(res.getString(2)).newInstance(); - is.readFields(in); - ins.add(new LlapInputSplit(is, res.getString(1))); - } - - res.close(); - stmt.close(); - } catch (Exception e) { - throw new IOException(e); - } - return ins.toArray(new InputSplit[ins.size()]); - } - - public void close() { - try { - con.close(); - } catch (Exception e) { - // ignore - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java b/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java deleted file mode 100644 index 4c3c3ab..0000000 --- a/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hive.jdbc; - -import java.io.OutputStream; -import java.io.InputStream; -import java.io.File; -import java.io.IOException; -import java.io.FileInputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.GnuParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.io.RCFile.Reader; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.hive.llap.io.api.LlapProxy; -import org.apache.hadoop.hive.llap.LlapBaseRecordReader; -import org.apache.hadoop.hive.llap.Schema; - -public class LlapDump { - - private static final Logger LOG = LoggerFactory.getLogger(LlapDump.class); - - private static String url = "jdbc:hive2://localhost:10000/default"; - private static String user = "hive"; - private static String pwd = ""; - private static String query = "select * from test"; - private static String numSplits = "1"; - - public static void main(String[] args) throws Exception { - Options opts = createOptions(); - CommandLine cli = new GnuParser().parse(opts, args); - - if (cli.hasOption('h')) { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp("orcfiledump", opts); - return; - } - - if (cli.hasOption('l')) { - url = cli.getOptionValue("l"); - } - - if (cli.hasOption('u')) { - user = cli.getOptionValue("u"); - } - - if (cli.hasOption('p')) { - pwd = cli.getOptionValue("p"); - } - - if (cli.hasOption('n')) { - numSplits = cli.getOptionValue("n"); - } - - if (cli.getArgs().length > 0) { - query = cli.getArgs()[0]; - } - - System.out.println("url: "+url); - System.out.println("user: "+user); - System.out.println("query: "+query); - - LlapBaseInputFormat format = new LlapBaseInputFormat(url, user, pwd, query); - JobConf job = new JobConf(); - - InputSplit[] splits = format.getSplits(job, Integer.parseInt(numSplits)); - - if (splits.length == 0) { - System.out.println("No splits returned - empty scan"); - System.out.println("Results: "); - } else { - boolean first = true; - - for (InputSplit s: splits) { - LOG.info("Processing input split s from " + Arrays.toString(s.getLocations())); - RecordReader<NullWritable, Text> reader = format.getRecordReader(s, job, null); - - if (reader instanceof LlapBaseRecordReader && first) { - Schema schema = ((LlapBaseRecordReader)reader).getSchema(); - System.out.println(""+schema); - } - - if (first) { - System.out.println("Results: "); - System.out.println(""); - first = false; - } - - Text value = reader.createValue(); - while (reader.next(NullWritable.get(), value)) { - System.out.println(value); - } - } - System.exit(0); - } - } - - static Options createOptions() { - Options result = new Options(); - - result.addOption(OptionBuilder - .withLongOpt("location") - .withDescription("HS2 url") - .hasArg() - .create('l')); - - result.addOption(OptionBuilder - .withLongOpt("user") - .withDescription("user name") - .hasArg() - .create('u')); - - result.addOption(OptionBuilder - .withLongOpt("pwd") - .withDescription("password") - .hasArg() - .create('p')); - - result.addOption(OptionBuilder - .withLongOpt("num") - .withDescription("number of splits") - .hasArg() - .create('n')); - - return result; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/jdbc/src/java/org/apache/hive/jdbc/LlapInputSplit.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapInputSplit.java b/jdbc/src/java/org/apache/hive/jdbc/LlapInputSplit.java deleted file mode 100644 index 0f4fd4e..0000000 --- a/jdbc/src/java/org/apache/hive/jdbc/LlapInputSplit.java +++ /dev/null @@ -1,73 +0,0 @@ -package org.apache.hive.jdbc; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.InputSplitWithLocationInfo; -import org.apache.hadoop.mapred.SplitLocationInfo; - - -public class LlapInputSplit<V extends WritableComparable> implements InputSplitWithLocationInfo { - InputSplitWithLocationInfo nativeSplit; - String inputFormatClassName; - - public LlapInputSplit() { - } - - public LlapInputSplit(InputSplitWithLocationInfo nativeSplit, String inputFormatClassName) { - this.nativeSplit = nativeSplit; - this.inputFormatClassName = inputFormatClassName; - } - - @Override - public long getLength() throws IOException { - return nativeSplit.getLength(); - } - - @Override - public String[] getLocations() throws IOException { - return nativeSplit.getLocations(); - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeUTF(inputFormatClassName); - out.writeUTF(nativeSplit.getClass().getName()); - nativeSplit.write(out); - } - - @Override - public void readFields(DataInput in) throws IOException { - inputFormatClassName = in.readUTF(); - String splitClass = in.readUTF(); - try { - nativeSplit = (InputSplitWithLocationInfo)Class.forName(splitClass).newInstance(); - } catch (Exception e) { - throw new IOException(e); - } - nativeSplit.readFields(in); - } - - @Override - public SplitLocationInfo[] getLocationInfo() throws IOException { - return nativeSplit.getLocationInfo(); - } - - public InputSplit getSplit() { - return nativeSplit; - } - - public InputFormat<NullWritable, V> getInputFormat() throws IOException { - try { - return (InputFormat<NullWritable, V>) Class.forName(inputFormatClassName) - .newInstance(); - } catch(Exception e) { - throw new IOException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/jdbc/src/java/org/apache/hive/jdbc/LlapRowInputFormat.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapRowInputFormat.java b/jdbc/src/java/org/apache/hive/jdbc/LlapRowInputFormat.java deleted file mode 100644 index 1cca66a..0000000 --- a/jdbc/src/java/org/apache/hive/jdbc/LlapRowInputFormat.java +++ /dev/null @@ -1,34 +0,0 @@ -package org.apache.hive.jdbc; - -import java.io.IOException; - -import org.apache.hadoop.hive.llap.LlapBaseRecordReader; -import org.apache.hadoop.hive.llap.LlapRowRecordReader; -import org.apache.hadoop.hive.llap.Row; -import org.apache.hadoop.hive.llap.Schema; - -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; - -public class LlapRowInputFormat implements InputFormat<NullWritable, Row> { - LlapBaseInputFormat<Text> baseInputFormat = new LlapBaseInputFormat<Text>(); - - @Override - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - return baseInputFormat.getSplits(job, numSplits); - } - - @Override - public RecordReader<NullWritable, Row> getRecordReader(InputSplit split, JobConf job, Reporter reporter) - throws IOException { - LlapInputSplit<Text> llapSplit = (LlapInputSplit<Text>) split; - LlapBaseRecordReader<Text> reader = (LlapBaseRecordReader<Text>) baseInputFormat.getRecordReader(llapSplit, job, reporter); - return new LlapRowRecordReader(job, reader.getSchema(), reader); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-client/pom.xml ---------------------------------------------------------------------- diff --git a/llap-client/pom.xml b/llap-client/pom.xml index 50c06a4..4a75bbb 100644 --- a/llap-client/pom.xml +++ b/llap-client/pom.xml @@ -109,6 +109,38 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-api</artifactId> + <version>${tez.version}</version> + <optional>true</optional> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-runtime-internals</artifactId> + <version>${tez.version}</version> + <optional>true</optional> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> <build> <sourceDirectory>${basedir}/src/java</sourceDirectory> http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java new file mode 100644 index 0000000..7073280 --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.DataInputStream; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.Schema; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.JobConf; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LlapBaseRecordReader<V extends WritableComparable> implements RecordReader<NullWritable, V> { + private static final Logger LOG = LoggerFactory.getLogger(LlapBaseRecordReader.class); + + DataInputStream din; + Schema schema; + Class<V> clazz; + + + protected Thread readerThread = null; + protected LinkedBlockingQueue<ReaderEvent> readerEvents = new LinkedBlockingQueue<ReaderEvent>(); + + public LlapBaseRecordReader(InputStream in, Schema schema, Class<V> clazz) { + din = new DataInputStream(in); + this.schema = schema; + this.clazz = clazz; + this.readerThread = Thread.currentThread(); + } + + public Schema getSchema() { + return schema; + } + + @Override + public void close() throws IOException { + din.close(); + } + + @Override + public long getPos() { return 0; } + + @Override + public float getProgress() { return 0f; } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public V createValue() { + try { + return clazz.newInstance(); + } catch (Exception e) { + return null; + } + } + + @Override + public boolean next(NullWritable key, V value) throws IOException { + try { + // Need a way to know what thread to interrupt, since this is a blocking thread. + setReaderThread(Thread.currentThread()); + + value.readFields(din); + return true; + } catch (EOFException eof) { + // End of input. There should be a reader event available, or coming soon, so okay to be blocking call. + ReaderEvent event = getReaderEvent(); + switch (event.getEventType()) { + case DONE: + break; + default: + throw new IOException("Expected reader event with done status, but got " + + event.getEventType() + " with message " + event.getMessage()); + } + return false; + } catch (IOException io) { + if (Thread.interrupted()) { + // Either we were interrupted by one of: + // 1. handleEvent(), in which case there is a reader event waiting for us in the queue + // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming. + // Either way we should not try to block trying to read the reader events queue. + if (readerEvents.isEmpty()) { + // Case 2. + throw io; + } else { + // Case 1. Fail the reader, sending back the error we received from the reader event. + ReaderEvent event = getReaderEvent(); + switch (event.getEventType()) { + case ERROR: + throw new IOException("Received reader event error: " + event.getMessage()); + default: + throw new IOException("Got reader event type " + event.getEventType() + ", expected error event"); + } + } + } else { + // If we weren't interrupted, just propagate the error + throw io; + } + } + } + + /** + * Define success/error events which are passed to the reader from a different thread. + * The reader will check for these events on end of input and interruption of the reader thread. + */ + public static class ReaderEvent { + public enum EventType { + DONE, + ERROR + } + + protected final EventType eventType; + protected final String message; + + protected ReaderEvent(EventType type, String message) { + this.eventType = type; + this.message = message; + } + + public static ReaderEvent doneEvent() { + return new ReaderEvent(EventType.DONE, ""); + } + + public static ReaderEvent errorEvent(String message) { + return new ReaderEvent(EventType.ERROR, message); + } + + public EventType getEventType() { + return eventType; + } + + public String getMessage() { + return message; + } + } + + public void handleEvent(ReaderEvent event) { + switch (event.getEventType()) { + case DONE: + // Reader will check for the event queue upon the end of the input stream - no need to interrupt. + readerEvents.add(event); + break; + case ERROR: + readerEvents.add(event); + if (readerThread == null) { + throw new RuntimeException("Reader thread is unexpectedly null, during ReaderEvent error " + event.getMessage()); + } + // Reader is using a blocking socket .. interrupt it. + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupting reader thread due to reader event with error " + event.getMessage()); + } + getReaderThread().interrupt(); + break; + default: + throw new RuntimeException("Unhandled ReaderEvent type " + event.getEventType() + " with message " + event.getMessage()); + } + } + + protected ReaderEvent getReaderEvent() { + try { + ReaderEvent event = readerEvents.take(); + return event; + } catch (InterruptedException ie) { + throw new RuntimeException("Interrupted while getting readerEvents, not expected", ie); + } + } + + protected synchronized void setReaderThread(Thread readerThread) { + this.readerThread = readerThread; + } + + protected synchronized Thread getReaderThread() { + return readerThread; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java new file mode 100644 index 0000000..0930d60 --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; + +import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; + +import org.apache.commons.collections4.ListUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapBaseRecordReader.ReaderEvent; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient; +import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder; +import org.apache.hadoop.hive.llap.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.llap.tez.Converters; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.common.security.TokenCache; +import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; +import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.runtime.api.impl.TezEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.impl.EventType; +import org.apache.tez.runtime.api.impl.TezEvent; +import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; +import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; + + +public class LlapInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> { + + private static final Logger LOG = LoggerFactory.getLogger(LlapInputFormat.class); + + public LlapInputFormat() { + } + + /* + * This proxy record reader has the duty of establishing a connected socket with LLAP, then fire + * off the work in the split to LLAP and finally return the connected socket back in an + * LlapRecordReader. The LlapRecordReader class reads the results from the socket. + */ + @Override + public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, + Reporter reporter) throws IOException { + + LlapInputSplit llapSplit = (LlapInputSplit) split; + + // Set conf to use LLAP user rather than current user for LLAP Zk registry. + HiveConf.setVar(job, HiveConf.ConfVars.LLAP_ZK_REGISTRY_USER, llapSplit.getLlapUser()); + SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes(llapSplit.getPlanBytes()); + + ServiceInstance serviceInstance = getServiceInstance(job, llapSplit); + String host = serviceInstance.getHost(); + int llapSubmitPort = serviceInstance.getRpcPort(); + + LOG.info("Found service instance for host " + host + " with rpc port " + llapSubmitPort + + " and outputformat port " + serviceInstance.getOutputFormatPort()); + + LlapRecordReaderTaskUmbilicalExternalResponder umbilicalResponder = + new LlapRecordReaderTaskUmbilicalExternalResponder(); + LlapTaskUmbilicalExternalClient llapClient = + new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(), + submitWorkInfo.getToken(), umbilicalResponder); + llapClient.init(job); + llapClient.start(); + + SubmitWorkRequestProto submitWorkRequestProto = + constructSubmitWorkRequestProto(submitWorkInfo, llapSplit.getSplitNum(), + llapClient.getAddress(), submitWorkInfo.getToken()); + + TezEvent tezEvent = new TezEvent(); + DataInputBuffer dib = new DataInputBuffer(); + dib.reset(llapSplit.getFragmentBytes(), 0, llapSplit.getFragmentBytes().length); + tezEvent.readFields(dib); + List<TezEvent> tezEventList = Lists.newArrayList(); + tezEventList.add(tezEvent); + + llapClient.submitWork(submitWorkRequestProto, host, llapSubmitPort, tezEventList); + + String id = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID) + "_" + llapSplit.getSplitNum(); + + HiveConf conf = new HiveConf(); + Socket socket = new Socket(host, + serviceInstance.getOutputFormatPort()); + + LOG.debug("Socket connected"); + + socket.getOutputStream().write(id.getBytes()); + socket.getOutputStream().write(0); + socket.getOutputStream().flush(); + + LOG.info("Registered id: " + id); + + LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class); + umbilicalResponder.setRecordReader(recordReader); + return recordReader; + } + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + throw new IOException("These are not the splits you are looking for."); + } + + private ServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapSplit) throws IOException { + LlapRegistryService registryService = LlapRegistryService.getClient(job); + String host = llapSplit.getLocations()[0]; + + ServiceInstance serviceInstance = getServiceInstanceForHost(registryService, host); + if (serviceInstance == null) { + throw new IOException("No service instances found for " + host + " in registry"); + } + + return serviceInstance; + } + + private ServiceInstance getServiceInstanceForHost(LlapRegistryService registryService, String host) throws IOException { + InetAddress address = InetAddress.getByName(host); + ServiceInstanceSet instanceSet = registryService.getInstances(); + ServiceInstance serviceInstance = null; + + // The name used in the service registry may not match the host name we're using. + // Try hostname/canonical hostname/host address + + String name = address.getHostName(); + LOG.info("Searching service instance by hostname " + name); + serviceInstance = selectServiceInstance(instanceSet.getByHost(name)); + if (serviceInstance != null) { + return serviceInstance; + } + + name = address.getCanonicalHostName(); + LOG.info("Searching service instance by canonical hostname " + name); + serviceInstance = selectServiceInstance(instanceSet.getByHost(name)); + if (serviceInstance != null) { + return serviceInstance; + } + + name = address.getHostAddress(); + LOG.info("Searching service instance by address " + name); + serviceInstance = selectServiceInstance(instanceSet.getByHost(name)); + if (serviceInstance != null) { + return serviceInstance; + } + + return serviceInstance; + } + + private ServiceInstance selectServiceInstance(Set<ServiceInstance> serviceInstances) { + if (serviceInstances == null || serviceInstances.isEmpty()) { + return null; + } + + // Get the first live service instance + for (ServiceInstance serviceInstance : serviceInstances) { + if (serviceInstance.isAlive()) { + return serviceInstance; + } + } + + LOG.info("No live service instances were found"); + return null; + } + + private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo, + int taskNum, + InetSocketAddress address, + Token<JobTokenIdentifier> token) throws + IOException { + TaskSpec taskSpec = submitWorkInfo.getTaskSpec(); + ApplicationId appId = submitWorkInfo.getFakeAppId(); + + SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder(); + // This works, assuming the executor is running within YARN. + LOG.info("Setting user in submitWorkRequest to: " + + System.getenv(ApplicationConstants.Environment.USER.name())); + builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name())); + builder.setApplicationIdString(appId.toString()); + builder.setAppAttemptNumber(0); + builder.setTokenIdentifier(appId.toString()); + + ContainerId containerId = + ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum); + builder.setContainerIdString(containerId.toString()); + + builder.setAmHost(address.getHostName()); + builder.setAmPort(address.getPort()); + Credentials taskCredentials = new Credentials(); + // Credentials can change across DAGs. Ideally construct only once per DAG. + // TODO Figure out where credentials will come from. Normally Hive sets up + // URLs on the tez dag, for which Tez acquires credentials. + + // taskCredentials.addAll(getContext().getCredentials()); + + // Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() == + // taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId()); + // ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto); + // if (credentialsBinary == null) { + // credentialsBinary = serializeCredentials(getContext().getCredentials()); + // credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate()); + // } else { + // credentialsBinary = credentialsBinary.duplicate(); + // } + // builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary)); + Credentials credentials = new Credentials(); + TokenCache.setSessionToken(token, credentials); + ByteBuffer credentialsBinary = serializeCredentials(credentials); + builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary)); + + + builder.setFragmentSpec(Converters.convertTaskSpecToProto(taskSpec)); + + FragmentRuntimeInfo.Builder runtimeInfo = FragmentRuntimeInfo.newBuilder(); + runtimeInfo.setCurrentAttemptStartTime(System.currentTimeMillis()); + runtimeInfo.setWithinDagPriority(0); + runtimeInfo.setDagStartTime(submitWorkInfo.getCreationTime()); + runtimeInfo.setFirstAttemptStartTime(submitWorkInfo.getCreationTime()); + runtimeInfo.setNumSelfAndUpstreamTasks(taskSpec.getVertexParallelism()); + runtimeInfo.setNumSelfAndUpstreamCompletedTasks(0); + + + builder.setUsingTezAm(false); + builder.setFragmentRuntimeInfo(runtimeInfo.build()); + return builder.build(); + } + + private ByteBuffer serializeCredentials(Credentials credentials) throws IOException { + Credentials containerCredentials = new Credentials(); + containerCredentials.addAll(credentials); + DataOutputBuffer containerTokens_dob = new DataOutputBuffer(); + containerCredentials.writeTokenStorageToStream(containerTokens_dob); + return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength()); + } + + private static class LlapRecordReaderTaskUmbilicalExternalResponder implements LlapTaskUmbilicalExternalResponder { + protected LlapBaseRecordReader recordReader = null; + protected LinkedBlockingQueue<ReaderEvent> queuedEvents = new LinkedBlockingQueue<ReaderEvent>(); + + public LlapRecordReaderTaskUmbilicalExternalResponder() { + } + + @Override + public void submissionFailed(String fragmentId, Throwable throwable) { + try { + sendOrQueueEvent(ReaderEvent.errorEvent( + "Received submission failed event for fragment ID " + fragmentId)); + } catch (Exception err) { + LOG.error("Error during heartbeat responder:", err); + } + } + + @Override + public void heartbeat(TezHeartbeatRequest request) { + TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID(); + List<TezEvent> inEvents = request.getEvents(); + for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) { + EventType eventType = tezEvent.getEventType(); + try { + switch (eventType) { + case TASK_ATTEMPT_COMPLETED_EVENT: + sendOrQueueEvent(ReaderEvent.doneEvent()); + break; + case TASK_ATTEMPT_FAILED_EVENT: + TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent) tezEvent.getEvent(); + sendOrQueueEvent(ReaderEvent.errorEvent(taskFailedEvent.getDiagnostics())); + break; + case TASK_STATUS_UPDATE_EVENT: + // If we want to handle counters + break; + default: + LOG.warn("Unhandled event type " + eventType); + break; + } + } catch (Exception err) { + LOG.error("Error during heartbeat responder:", err); + } + } + } + + @Override + public void taskKilled(TezTaskAttemptID taskAttemptId) { + try { + sendOrQueueEvent(ReaderEvent.errorEvent( + "Received task killed event for task ID " + taskAttemptId)); + } catch (Exception err) { + LOG.error("Error during heartbeat responder:", err); + } + } + + @Override + public void heartbeatTimeout(String taskAttemptId) { + try { + sendOrQueueEvent(ReaderEvent.errorEvent( + "Timed out waiting for heartbeat for task ID " + taskAttemptId)); + } catch (Exception err) { + LOG.error("Error during heartbeat responder:", err); + } + } + + public synchronized LlapBaseRecordReader getRecordReader() { + return recordReader; + } + + public synchronized void setRecordReader(LlapBaseRecordReader recordReader) { + this.recordReader = recordReader; + + if (recordReader == null) { + return; + } + + // If any events were queued by the responder, give them to the record reader now. + while (!queuedEvents.isEmpty()) { + ReaderEvent readerEvent = queuedEvents.poll(); + LOG.debug("Sending queued event to record reader: " + readerEvent.getEventType()); + recordReader.handleEvent(readerEvent); + } + } + + /** + * Send the ReaderEvents to the record reader, if it is registered to this responder. + * If there is no registered record reader, add them to a list of pending reader events + * since we don't want to drop these events. + * @param readerEvent + */ + protected synchronized void sendOrQueueEvent(ReaderEvent readerEvent) { + LlapBaseRecordReader recordReader = getRecordReader(); + if (recordReader != null) { + recordReader.handleEvent(readerEvent); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("No registered record reader, queueing event " + readerEvent.getEventType() + + " with message " + readerEvent.getMessage()); + } + + try { + queuedEvents.put(readerEvent); + } catch (Exception err) { + throw new RuntimeException("Unexpected exception while queueing reader event", err); + } + } + } + + /** + * Clear the list of queued reader events if we are not interested in sending any pending events to any registering record reader. + */ + public void clearQueuedEvents() { + queuedEvents.clear(); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java new file mode 100644 index 0000000..02aedfd --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hive.llap.Schema; +import org.apache.hadoop.mapred.InputSplitWithLocationInfo; +import org.apache.hadoop.mapred.SplitLocationInfo; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TSerializer; + +public class LlapInputSplit implements InputSplitWithLocationInfo { + + int splitNum; + byte[] planBytes; + byte[] fragmentBytes; + SplitLocationInfo[] locations; + Schema schema; + String llapUser; + + public LlapInputSplit() { + } + + public LlapInputSplit(int splitNum, byte[] planBytes, byte[] fragmentBytes, SplitLocationInfo[] locations, Schema schema, String llapUser) { + this.planBytes = planBytes; + this.fragmentBytes = fragmentBytes; + this.locations = locations; + this.schema = schema; + this.splitNum = splitNum; + this.llapUser = llapUser; + } + + public Schema getSchema() { + return schema; + } + + @Override + public long getLength() throws IOException { + return 0; + } + + @Override + public String[] getLocations() throws IOException { + String[] locs = new String[locations.length]; + for (int i = 0; i < locations.length; ++i) { + locs[i] = locations[i].getLocation(); + } + return locs; + } + + public int getSplitNum() { + return splitNum; + } + + public byte[] getPlanBytes() { + return planBytes; + } + + public byte[] getFragmentBytes() { + return fragmentBytes; + } + + + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(splitNum); + out.writeInt(planBytes.length); + out.write(planBytes); + + out.writeInt(fragmentBytes.length); + out.write(fragmentBytes); + + out.writeInt(locations.length); + for (int i = 0; i < locations.length; ++i) { + out.writeUTF(locations[i].getLocation()); + } + + schema.write(out); + out.writeUTF(llapUser); + } + + @Override + public void readFields(DataInput in) throws IOException { + splitNum = in.readInt(); + int length = in.readInt(); + planBytes = new byte[length]; + in.readFully(planBytes); + + length = in.readInt(); + fragmentBytes = new byte[length]; + in.readFully(fragmentBytes); + + length = in.readInt(); + locations = new SplitLocationInfo[length]; + + for (int i = 0; i < length; ++i) { + locations[i] = new SplitLocationInfo(in.readUTF(), false); + } + + schema = new Schema(); + schema.readFields(in); + llapUser = in.readUTF(); + } + + @Override + public SplitLocationInfo[] getLocationInfo() throws IOException { + return locations; + } + + public String getLlapUser() { + return llapUser; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java new file mode 100644 index 0000000..4e000ff --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java @@ -0,0 +1,155 @@ +package org.apache.hadoop.hive.llap; + +import com.google.common.base.Preconditions; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.JobConf; + +import org.apache.hadoop.hive.llap.Row; +import org.apache.hadoop.hive.llap.FieldDesc; +import org.apache.hadoop.hive.llap.Schema; +import org.apache.hadoop.hive.llap.TypeDesc; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.io.HiveCharWritable; +import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class LlapRowRecordReader implements RecordReader<NullWritable, Row> { + + private static final Logger LOG = LoggerFactory.getLogger(LlapRowRecordReader.class); + + Configuration conf; + RecordReader<NullWritable, Text> reader; + Schema schema; + SerDe serde; + final Text textData = new Text(); + + public LlapRowRecordReader(Configuration conf, Schema schema, RecordReader<NullWritable, Text> reader) { + this.conf = conf; + this.schema = schema; + this.reader = reader; + } + + @Override + public void close() throws IOException { + reader.close(); + } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public Row createValue() { + return new Row(schema); + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public float getProgress() throws IOException { + return 0; + } + + @Override + public boolean next(NullWritable key, Row value) throws IOException { + Preconditions.checkArgument(value != null); + + if (serde == null) { + try { + serde = initSerDe(conf); + } catch (SerDeException err) { + throw new IOException(err); + } + } + + boolean hasNext = reader.next(key, textData); + if (hasNext) { + // Deserialize Text to column values, and populate the row record + Object rowObj; + try { + StructObjectInspector rowOI = (StructObjectInspector) serde.getObjectInspector(); + rowObj = serde.deserialize(textData); + List<? extends StructField> colFields = rowOI.getAllStructFieldRefs(); + for (int idx = 0; idx < colFields.size(); ++idx) { + StructField field = colFields.get(idx); + Object colValue = rowOI.getStructFieldData(rowObj, field); + Preconditions.checkState(field.getFieldObjectInspector().getCategory() == Category.PRIMITIVE, + "Cannot handle non-primitive column type " + field.getFieldObjectInspector().getTypeName()); + + PrimitiveObjectInspector poi = (PrimitiveObjectInspector) field.getFieldObjectInspector(); + // char/varchar special cased here since the row record handles them using Text + switch (poi.getPrimitiveCategory()) { + case CHAR: + value.setValue(idx, ((HiveCharWritable) poi.getPrimitiveWritableObject(colValue)).getPaddedValue()); + break; + case VARCHAR: + value.setValue(idx, ((HiveVarcharWritable) poi.getPrimitiveWritableObject(colValue)).getTextValue()); + break; + default: + value.setValue(idx, (Writable) poi.getPrimitiveWritableObject(colValue)); + break; + } + } + } catch (SerDeException err) { + if (LOG.isDebugEnabled()) { + LOG.debug("Error deserializing row from text: " + textData); + } + throw new IOException("Error deserializing row data", err); + } + } + + return hasNext; + } + + public Schema getSchema() { + return schema; + } + + protected SerDe initSerDe(Configuration conf) throws SerDeException { + Properties props = new Properties(); + StringBuffer columnsBuffer = new StringBuffer(); + StringBuffer typesBuffer = new StringBuffer(); + boolean isFirst = true; + for (FieldDesc colDesc : schema.getColumns()) { + if (!isFirst) { + columnsBuffer.append(','); + typesBuffer.append(','); + } + columnsBuffer.append(colDesc.getName()); + typesBuffer.append(colDesc.getTypeDesc().toString()); + isFirst = false; + } + String columns = columnsBuffer.toString(); + String types = typesBuffer.toString(); + props.put(serdeConstants.LIST_COLUMNS, columns); + props.put(serdeConstants.LIST_COLUMN_TYPES, types); + SerDe serde = new LazySimpleSerDe(); + serde.initialize(conf, props); + + return serde; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java b/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java new file mode 100644 index 0000000..83149ab --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java @@ -0,0 +1,103 @@ +package org.apache.hadoop.hive.llap; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.runtime.api.impl.TaskSpec; + +public class SubmitWorkInfo implements Writable { + + private TaskSpec taskSpec; + private ApplicationId fakeAppId; + private long creationTime; + + // This is used to communicate over the LlapUmbilicalProtocol. Not related to tokens used to + // talk to LLAP daemons itself via the securit work. + private Token<JobTokenIdentifier> token; + + public SubmitWorkInfo(TaskSpec taskSpec, ApplicationId fakeAppId, long creationTime) { + this.taskSpec = taskSpec; + this.fakeAppId = fakeAppId; + this.token = createJobToken(); + this.creationTime = creationTime; + } + + // Empty constructor for writable etc. + public SubmitWorkInfo() { + } + + public TaskSpec getTaskSpec() { + return taskSpec; + } + + public ApplicationId getFakeAppId() { + return fakeAppId; + } + + public String getTokenIdentifier() { + return fakeAppId.toString(); + } + + public Token<JobTokenIdentifier> getToken() { + return token; + } + + public long getCreationTime() { + return creationTime; + } + + @Override + public void write(DataOutput out) throws IOException { + taskSpec.write(out); + out.writeLong(fakeAppId.getClusterTimestamp()); + out.writeInt(fakeAppId.getId()); + token.write(out); + out.writeLong(creationTime); + } + + @Override + public void readFields(DataInput in) throws IOException { + taskSpec = new TaskSpec(); + taskSpec.readFields(in); + long appIdTs = in.readLong(); + int appIdId = in.readInt(); + fakeAppId = ApplicationId.newInstance(appIdTs, appIdId); + token = new Token<>(); + token.readFields(in); + creationTime = in.readLong(); + } + + public static byte[] toBytes(SubmitWorkInfo submitWorkInfo) throws IOException { + DataOutputBuffer dob = new DataOutputBuffer(); + submitWorkInfo.write(dob); + return dob.getData(); + } + + public static SubmitWorkInfo fromBytes(byte[] submitWorkInfoBytes) throws IOException { + DataInputBuffer dib = new DataInputBuffer(); + dib.reset(submitWorkInfoBytes, 0, submitWorkInfoBytes.length); + SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(); + submitWorkInfo.readFields(dib); + return submitWorkInfo; + } + + + private Token<JobTokenIdentifier> createJobToken() { + String tokenIdentifier = fakeAppId.toString(); + JobTokenIdentifier identifier = new JobTokenIdentifier(new Text( + tokenIdentifier)); + Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier, + new JobTokenSecretManager()); + sessionToken.setService(identifier.getJobId()); + return sessionToken; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java new file mode 100644 index 0000000..7d06637 --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java @@ -0,0 +1,415 @@ +package org.apache.hadoop.hive.llap.ext; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Preconditions; +import org.apache.commons.collections4.ListUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; +import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy; +import org.apache.hadoop.hive.llap.tezplugins.helpers.LlapTaskUmbilicalServer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.impl.EventType; +import org.apache.tez.runtime.api.impl.TezEvent; +import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; +import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class LlapTaskUmbilicalExternalClient extends AbstractService { + + private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalExternalClient.class); + + private final LlapProtocolClientProxy communicator; + private volatile LlapTaskUmbilicalServer llapTaskUmbilicalServer; + private final Configuration conf; + private final LlapTaskUmbilicalProtocol umbilical; + + protected final String tokenIdentifier; + protected final Token<JobTokenIdentifier> sessionToken; + + private final ConcurrentMap<String, PendingEventData> pendingEvents = new ConcurrentHashMap<>(); + private final ConcurrentMap<String, TaskHeartbeatInfo> registeredTasks= new ConcurrentHashMap<String, TaskHeartbeatInfo>(); + private LlapTaskUmbilicalExternalResponder responder = null; + private final ScheduledThreadPoolExecutor timer; + private final long connectionTimeout; + + private static class TaskHeartbeatInfo { + final String taskAttemptId; + final String hostname; + final int port; + final AtomicLong lastHeartbeat = new AtomicLong(); + + public TaskHeartbeatInfo(String taskAttemptId, String hostname, int port) { + this.taskAttemptId = taskAttemptId; + this.hostname = hostname; + this.port = port; + this.lastHeartbeat.set(System.currentTimeMillis()); + } + } + + private static class PendingEventData { + final TaskHeartbeatInfo heartbeatInfo; + final List<TezEvent> tezEvents; + + public PendingEventData(TaskHeartbeatInfo heartbeatInfo, List<TezEvent> tezEvents) { + this.heartbeatInfo = heartbeatInfo; + this.tezEvents = tezEvents; + } + } + + // TODO KKK Work out the details of the tokenIdentifier, and the session token. + // It may just be possible to create one here - since Shuffle is not involved, and this is only used + // for communication from LLAP-Daemons to the server. It will need to be sent in as part + // of the job submission request. + public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifier, + Token<JobTokenIdentifier> sessionToken, LlapTaskUmbilicalExternalResponder responder) { + super(LlapTaskUmbilicalExternalClient.class.getName()); + this.conf = conf; + this.umbilical = new LlapTaskUmbilicalExternalImpl(); + this.tokenIdentifier = tokenIdentifier; + this.sessionToken = sessionToken; + this.responder = responder; + this.timer = new ScheduledThreadPoolExecutor(1); + this.connectionTimeout = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS); + // TODO. No support for the LLAP token yet. Add support for configurable threads, however 1 should always be enough. + this.communicator = new LlapProtocolClientProxy(1, conf, null); + this.communicator.init(conf); + } + + @Override + public void serviceStart() throws IOException { + int numHandlers = HiveConf.getIntVar(conf, + HiveConf.ConfVars.LLAP_TMP_EXT_CLIENT_NUM_SERVER_HANDLERS); + llapTaskUmbilicalServer = new LlapTaskUmbilicalServer(conf, umbilical, numHandlers, tokenIdentifier, sessionToken); + communicator.start(); + } + + @Override + public void serviceStop() { + llapTaskUmbilicalServer.shutdownServer(); + timer.shutdown(); + if (this.communicator != null) { + this.communicator.stop(); + } + } + + public InetSocketAddress getAddress() { + return llapTaskUmbilicalServer.getAddress(); + } + + + /** + * Submit the work for actual execution. This should always have the usingTezAm flag disabled + * @param submitWorkRequestProto + */ + public void submitWork(final SubmitWorkRequestProto submitWorkRequestProto, String llapHost, int llapPort, List<TezEvent> tezEvents) { + Preconditions.checkArgument(submitWorkRequestProto.getUsingTezAm() == false); + + // Register the pending events to be sent for this spec. + String fragmentId = submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(); + PendingEventData pendingEventData = new PendingEventData( + new TaskHeartbeatInfo(fragmentId, llapHost, llapPort), + tezEvents); + pendingEvents.putIfAbsent(fragmentId, pendingEventData); + + // Setup timer task to check for hearbeat timeouts + timer.scheduleAtFixedRate(new HeartbeatCheckTask(), + connectionTimeout, connectionTimeout, TimeUnit.MILLISECONDS); + + // Send out the actual SubmitWorkRequest + communicator.sendSubmitWork(submitWorkRequestProto, llapHost, llapPort, + new LlapProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.SubmitWorkResponseProto>() { + + @Override + public void setResponse(LlapDaemonProtocolProtos.SubmitWorkResponseProto response) { + if (response.hasSubmissionState()) { + if (response.getSubmissionState().equals(LlapDaemonProtocolProtos.SubmissionStateProto.REJECTED)) { + String msg = "Fragment: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString() + " rejected. Server Busy."; + LOG.info(msg); + if (responder != null) { + Throwable err = new RuntimeException(msg); + responder.submissionFailed(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), err); + } + return; + } + } + } + + @Override + public void indicateError(Throwable t) { + String msg = "Failed to submit: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(); + LOG.error(msg, t); + Throwable err = new RuntimeException(msg, t); + responder.submissionFailed(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), err); + } + }); + + + + + +// // TODO Also send out information saying that the fragment is finishable - if that is not already included in the main fragment. +// // This entire call is only required if we're doing more than scans. MRInput has no dependencies and is always finishable +// QueryIdentifierProto queryIdentifier = QueryIdentifierProto +// .newBuilder() +// .setAppIdentifier(submitWorkRequestProto.getApplicationIdString()).setDagIdentifier(submitWorkRequestProto.getFragmentSpec().getDagId()) +// .build(); +// LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto sourceStateUpdatedRequest = +// LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(queryIdentifier).setState( +// LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED). +// setSrcName(TODO) +// communicator.sendSourceStateUpdate(LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString()).set); + + + } + + private void updateHeartbeatInfo(String taskAttemptId) { + int updateCount = 0; + + PendingEventData pendingEventData = pendingEvents.get(taskAttemptId); + if (pendingEventData != null) { + pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis()); + updateCount++; + } + + TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(taskAttemptId); + if (heartbeatInfo != null) { + heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis()); + updateCount++; + } + + if (updateCount == 0) { + LOG.warn("No tasks found for heartbeat from taskAttemptId " + taskAttemptId); + } + } + + private void updateHeartbeatInfo(String hostname, int port) { + int updateCount = 0; + + for (String key : pendingEvents.keySet()) { + PendingEventData pendingEventData = pendingEvents.get(key); + if (pendingEventData != null) { + if (pendingEventData.heartbeatInfo.hostname.equals(hostname) + && pendingEventData.heartbeatInfo.port == port) { + pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis()); + updateCount++; + } + } + } + + for (String key : registeredTasks.keySet()) { + TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key); + if (heartbeatInfo != null) { + if (heartbeatInfo.hostname.equals(hostname) + && heartbeatInfo.port == port) { + heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis()); + updateCount++; + } + } + } + + if (updateCount == 0) { + LOG.info("No tasks found for heartbeat from hostname " + hostname + ", port " + port); + } + } + + private class HeartbeatCheckTask implements Runnable { + public void run() { + long currentTime = System.currentTimeMillis(); + List<String> timedOutTasks = new ArrayList<String>(); + + // Check both pending and registered tasks for timeouts + for (String key : pendingEvents.keySet()) { + PendingEventData pendingEventData = pendingEvents.get(key); + if (pendingEventData != null) { + if (currentTime - pendingEventData.heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) { + timedOutTasks.add(key); + } + } + } + for (String timedOutTask : timedOutTasks) { + LOG.info("Pending taskAttemptId " + timedOutTask + " timed out"); + responder.heartbeatTimeout(timedOutTask); + pendingEvents.remove(timedOutTask); + // TODO: Do we need to tell the LLAP daemon we are no longer interested in this task? + } + + timedOutTasks.clear(); + for (String key : registeredTasks.keySet()) { + TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key); + if (heartbeatInfo != null) { + if (currentTime - heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) { + timedOutTasks.add(key); + } + } + } + for (String timedOutTask : timedOutTasks) { + LOG.info("Running taskAttemptId " + timedOutTask + " timed out"); + responder.heartbeatTimeout(timedOutTask); + registeredTasks.remove(timedOutTask); + // TODO: Do we need to tell the LLAP daemon we are no longer interested in this task? + } + } + } + + public interface LlapTaskUmbilicalExternalResponder { + void submissionFailed(String fragmentId, Throwable throwable); + void heartbeat(TezHeartbeatRequest request); + void taskKilled(TezTaskAttemptID taskAttemptId); + void heartbeatTimeout(String fragmentId); + } + + + + // TODO Ideally, the server should be shared across all client sessions running on the same node. + private class LlapTaskUmbilicalExternalImpl implements LlapTaskUmbilicalProtocol { + + @Override + public boolean canCommit(TezTaskAttemptID taskid) throws IOException { + // Expecting only a single instance of a task to be running. + return true; + } + + @Override + public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException, + TezException { + // Keep-alive information. The client should be informed and will have to take care of re-submitting the work. + // Some parts of fault tolerance go here. + + // This also provides completion information, and a possible notification when task actually starts running (first heartbeat) + + if (LOG.isDebugEnabled()) { + LOG.debug("Received heartbeat from container, request=" + request); + } + + // Incoming events can be ignored until the point when shuffle needs to be handled, instead of just scans. + TezHeartbeatResponse response = new TezHeartbeatResponse(); + + response.setLastRequestId(request.getRequestId()); + // Assuming TaskAttemptId and FragmentIdentifierString are the same. Verify this. + TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID(); + String taskAttemptIdString = taskAttemptId.toString(); + + updateHeartbeatInfo(taskAttemptIdString); + + List<TezEvent> tezEvents = null; + PendingEventData pendingEventData = pendingEvents.remove(taskAttemptIdString); + if (pendingEventData == null) { + tezEvents = Collections.emptyList(); + + // If this heartbeat was not from a pending event and it's not in our list of registered tasks, + if (!registeredTasks.containsKey(taskAttemptIdString)) { + LOG.info("Unexpected heartbeat from " + taskAttemptIdString); + response.setShouldDie(); // Do any of the other fields need to be set? + return response; + } + } else { + tezEvents = pendingEventData.tezEvents; + // Tasks removed from the pending list should then be added to the registered list. + registeredTasks.put(taskAttemptIdString, pendingEventData.heartbeatInfo); + } + + response.setLastRequestId(request.getRequestId()); + // Irrelevant from eventIds. This can be tracked in the AM itself, instead of polluting the task. + // Also since we have all the MRInput events here - they'll all be sent in together. + response.setNextFromEventId(0); // Irrelevant. See comment above. + response.setNextPreRoutedEventId(0); //Irrelevant. See comment above. + response.setEvents(tezEvents); + + List<TezEvent> inEvents = request.getEvents(); + if (LOG.isDebugEnabled()) { + LOG.debug("Heartbeat from " + taskAttemptIdString + + " events: " + (inEvents != null ? inEvents.size() : -1)); + } + for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) { + EventType eventType = tezEvent.getEventType(); + switch (eventType) { + case TASK_ATTEMPT_COMPLETED_EVENT: + LOG.debug("Task completed event for " + taskAttemptIdString); + registeredTasks.remove(taskAttemptIdString); + break; + case TASK_ATTEMPT_FAILED_EVENT: + LOG.debug("Task failed event for " + taskAttemptIdString); + registeredTasks.remove(taskAttemptIdString); + break; + case TASK_STATUS_UPDATE_EVENT: + // If we want to handle counters + LOG.debug("Task update event for " + taskAttemptIdString); + break; + default: + LOG.warn("Unhandled event type " + eventType); + break; + } + } + + // Pass the request on to the responder + try { + if (responder != null) { + responder.heartbeat(request); + } + } catch (Exception err) { + LOG.error("Error during responder execution", err); + } + + return response; + } + + @Override + public void nodeHeartbeat(Text hostname, int port) throws IOException { + updateHeartbeatInfo(hostname.toString(), port); + // No need to propagate to this to the responder + } + + @Override + public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException { + String taskAttemptIdString = taskAttemptId.toString(); + LOG.error("Task killed - " + taskAttemptIdString); + registeredTasks.remove(taskAttemptIdString); + + try { + if (responder != null) { + responder.taskKilled(taskAttemptId); + } + } catch (Exception err) { + LOG.error("Error during responder execution", err); + } + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) throws IOException { + return 0; + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, + int clientMethodsHash) throws IOException { + return ProtocolSignature.getProtocolSignature(this, protocol, + clientVersion, clientMethodsHash); + } + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java new file mode 100644 index 0000000..dbd591a --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java @@ -0,0 +1,57 @@ +package org.apache.hadoop.hive.llap.tezplugins.helpers; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.Token; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LlapTaskUmbilicalServer { + + private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalServer.class); + + protected volatile Server server; + private final InetSocketAddress address; + private final AtomicBoolean started = new AtomicBoolean(true); + + public LlapTaskUmbilicalServer(Configuration conf, LlapTaskUmbilicalProtocol umbilical, int numHandlers, String tokenIdentifier, Token<JobTokenIdentifier> token) throws + IOException { + JobTokenSecretManager jobTokenSecretManager = + new JobTokenSecretManager(); + jobTokenSecretManager.addTokenForJob(tokenIdentifier, token); + + server = new RPC.Builder(conf) + .setProtocol(LlapTaskUmbilicalProtocol.class) + .setBindAddress("0.0.0.0") + .setPort(0) + .setInstance(umbilical) + .setNumHandlers(numHandlers) + .setSecretManager(jobTokenSecretManager).build(); + + server.start(); + this.address = NetUtils.getConnectAddress(server); + LOG.info( + "Started TaskUmbilicalServer: " + umbilical.getClass().getName() + " at address: " + address + + " with numHandlers=" + numHandlers); + } + + public InetSocketAddress getAddress() { + return this.address; + } + + public void shutdownServer() { + if (started.get()) { // Primarily to avoid multiple shutdowns. + started.set(false); + server.stop(); + } + } +}