Repository: drill Updated Branches: refs/heads/master d3f8da2b6 -> 54d3d2018
http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/test/java/org/apache/drill/test/TestGracefulShutdown.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/TestGracefulShutdown.java b/exec/java-exec/src/test/java/org/apache/drill/test/TestGracefulShutdown.java new file mode 100644 index 0000000..f69152e --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/test/TestGracefulShutdown.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.test; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.server.Drillbit; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.Collection; +import java.util.Properties; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.fail; +import java.nio.file.Path; +import java.io.BufferedWriter; + + + + +public class TestGracefulShutdown extends BaseTestQuery{ + + @BeforeClass + public static void setUpTestData() throws Exception { + + for( int i = 0; i < 300; i++) { + setupFile(i); + } + } + + + public static final Properties WEBSERVER_CONFIGURATION = new Properties() { + { + put(ExecConstants.HTTP_ENABLE, true); + put(ExecConstants.HTTP_PORT_HUNT, true); + put(ExecConstants.DRILL_PORT_HUNT, true); + put(ExecConstants.GRACE_PERIOD, 10000); + } + }; + + public static final Properties DRILL_PORT_CONFIGURATION = new Properties() { + { + put(ExecConstants.DRILL_PORT_HUNT, true); + put(ExecConstants.GRACE_PERIOD, 10000); + } + }; + + public ClusterFixtureBuilder enableWebServer(ClusterFixtureBuilder builder) { + Properties props = new Properties(); + props.putAll(WEBSERVER_CONFIGURATION); + builder.configBuilder.configProps(props); + builder.sessionOption(ExecConstants.SLICE_TARGET, 10); + return builder; + } + + public ClusterFixtureBuilder enableDrillPortHunting(ClusterFixtureBuilder builder) { + Properties props = new Properties(); + props.putAll(DRILL_PORT_CONFIGURATION); + builder.configBuilder.configProps(props); + return builder; + } + + + + /* + Start multiple drillbits and then shutdown a drillbit. Query the online + endpoints and check if the drillbit still exists. + */ + @Test + public void testOnlineEndPoints() throws Exception { + + String[] drillbits = {"db1" ,"db2","db3", "db4", "db5", "db6"}; + ClusterFixtureBuilder builder = ClusterFixture.bareBuilder(dirTestWatcher).withBits(drillbits).withLocalZk(); + enableDrillPortHunting(builder); + + try ( ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + + Drillbit drillbit = cluster.drillbit("db2"); + DrillbitEndpoint drillbitEndpoint = drillbit.getRegistrationHandle().getEndPoint(); + int grace_period = drillbit.getContext().getConfig().getInt(ExecConstants.GRACE_PERIOD); + new Thread(new Runnable() { + public void run() { + try { + cluster.closeDrillbit("db2"); + } catch (Exception e) { + fail(); + } + } + }).start(); + //wait for graceperiod + Thread.sleep(grace_period); + Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit().getContext() + .getClusterCoordinator() + .getOnlineEndPoints(); + Assert.assertFalse(drillbitEndpoints.contains(drillbitEndpoint)); + } + } + /* + Test if the drillbit transitions from ONLINE state when a shutdown + request is initiated + */ + @Test + public void testStateChange() throws Exception { + + String[] drillbits = {"db1" ,"db2", "db3", "db4", "db5", "db6"}; + ClusterFixtureBuilder builder = ClusterFixture.bareBuilder(dirTestWatcher).withBits(drillbits).withLocalZk(); + enableDrillPortHunting(builder); + + try ( ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + Drillbit drillbit = cluster.drillbit("db2"); + int grace_period = drillbit.getContext().getConfig().getInt(ExecConstants.GRACE_PERIOD); + DrillbitEndpoint drillbitEndpoint = drillbit.getRegistrationHandle().getEndPoint(); + new Thread(new Runnable() { + public void run() { + try { + cluster.closeDrillbit("db2"); + } catch (Exception e) { + fail(); + } + } + }).start(); + Thread.sleep(grace_period); + Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit().getContext() + .getClusterCoordinator() + .getAvailableEndpoints(); + for (DrillbitEndpoint dbEndpoint : drillbitEndpoints) { + if(drillbitEndpoint.getAddress().equals(dbEndpoint.getAddress()) && drillbitEndpoint.getUserPort() == dbEndpoint.getUserPort()) { + assertNotEquals(dbEndpoint.getState(),DrillbitEndpoint.State.ONLINE); + } + } + } + } + + /* + Test shutdown through RestApi + */ + @Test + public void testRestApi() throws Exception { + + String[] drillbits = { "db1" ,"db2", "db3" }; + ClusterFixtureBuilder builder = ClusterFixture.bareBuilder(dirTestWatcher).withBits(drillbits).withLocalZk(); + builder = enableWebServer(builder); + QueryBuilder.QuerySummaryFuture listener; + final String sql = "select * from dfs.root.`.`"; + try ( ClusterFixture cluster = builder.build(); + final ClientFixture client = cluster.clientFixture()) { + Drillbit drillbit = cluster.drillbit("db1"); + int port = drillbit.getContext().getConfig().getInt("drill.exec.http.port"); + int grace_period = drillbit.getContext().getConfig().getInt(ExecConstants.GRACE_PERIOD); + listener = client.queryBuilder().sql(sql).futureSummary(); + Thread.sleep(60000); + while( port < 8049) { + URL url = new URL("http://localhost:"+port+"/gracefulShutdown"); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("POST"); + if (conn.getResponseCode() != 200) { + throw new RuntimeException("Failed : HTTP error code : " + + conn.getResponseCode()); + } + port++; + } + Thread.sleep(grace_period); + Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit().getContext() + .getClusterCoordinator() + .getOnlineEndPoints(); + while(!listener.isDone()) { + Thread.sleep(10); + } + Assert.assertTrue(listener.isDone()); + Assert.assertEquals(1,drillbitEndpoints.size()); + } + } + + /* + Test default shutdown through RestApi + */ + @Test + public void testRestApiShutdown() throws Exception { + + String[] drillbits = {"db1" ,"db2", "db3"}; + ClusterFixtureBuilder builder = ClusterFixture.bareBuilder(dirTestWatcher).withBits(drillbits).withLocalZk(); + builder = enableWebServer(builder); + QueryBuilder.QuerySummaryFuture listener; + final String sql = "select * from dfs.root.`.`"; + try ( ClusterFixture cluster = builder.build(); + final ClientFixture client = cluster.clientFixture()) { + Drillbit drillbit = cluster.drillbit("db1"); + int port = drillbit.getContext().getConfig().getInt("drill.exec.http.port"); + int grace_period = drillbit.getContext().getConfig().getInt(ExecConstants.GRACE_PERIOD); + listener = client.queryBuilder().sql(sql).futureSummary(); + Thread.sleep(10000); + while( port < 8048) { + URL url = new URL("http://localhost:"+port+"/shutdown"); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("POST"); + if (conn.getResponseCode() != 200) { + throw new RuntimeException("Failed : HTTP error code : " + + conn.getResponseCode()); + } + port++; + } + Thread.sleep(grace_period); + Thread.sleep(5000); + Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit().getContext() + .getClusterCoordinator().getAvailableEndpoints(); + while(!listener.isDone()) { + Thread.sleep(10); + } + Assert.assertTrue(listener.isDone()); + Assert.assertEquals(2,drillbitEndpoints.size()); + } + } + + private static void setupFile(int file_num) throws Exception { + final String file = "employee"+file_num+".json"; + final Path path = dirTestWatcher.getRootDir().toPath().resolve(file); + try(PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(path.toFile(), true)))) { + out.println("{\"employee_id\":1,\"full_name\":\"Sheri Nowmer\",\"first_name\":\"Sheri\",\"last_name\":\"Nowmer\",\"position_id\":1,\"position_title\":\"President\",\"store_id\":0,\"department_id\":1,\"birth_date\":\"1961-08-26\",\"hire_date\":\"1994-12-01 00:00:00.0\",\"end_date\":null,\"salary\":80000.0000,\"supervisor_id\":0,\"education_level\":\"Graduate Degree\",\"marital_status\":\"S\",\"gender\":\"F\",\"management_role\":\"Senior Management\"}\n" + + "{\"employee_id\":2,\"full_name\":\"Derrick Whelply\",\"first_name\":\"Derrick\",\"last_name\":\"Whelply\",\"position_id\":2,\"position_title\":\"VP Country Manager\",\"store_id\":0,\"department_id\":1,\"birth_date\":\"1915-07-03\",\"hire_date\":\"1994-12-01 00:00:00.0\",\"end_date\":null,\"salary\":40000.0000,\"supervisor_id\":1,\"education_level\":\"Graduate Degree\",\"marital_status\":\"M\",\"gender\":\"M\",\"management_role\":\"Senior Management\"}\n" + + "{\"employee_id\":4,\"full_name\":\"Michael Spence\",\"first_name\":\"Michael\",\"last_name\":\"Spence\",\"position_id\":2,\"position_title\":\"VP Country Manager\",\"store_id\":0,\"department_id\":1,\"birth_date\":\"1969-06-20\",\"hire_date\":\"1998-01-01 00:00:00.0\",\"end_date\":null,\"salary\":40000.0000,\"supervisor_id\":1,\"education_level\":\"Graduate Degree\",\"marital_status\":\"S\",\"gender\":\"M\",\"management_role\":\"Senior Management\"}\n" + + "{\"employee_id\":5,\"full_name\":\"Maya Gutierrez\",\"first_name\":\"Maya\",\"last_name\":\"Gutierrez\",\"position_id\":2,\"position_title\":\"VP Country Manager\",\"store_id\":0,\"department_id\":1,\"birth_date\":\"1951-05-10\",\"hire_date\":\"1998-01-01 00:00:00.0\",\"end_date\":null,\"salary\":35000.0000,\"supervisor_id\":1,\"education_level\":\"Bachelors Degree\",\"marital_status\":\"M\",\"gender\":\"F\",\"management_role\":\"Senior Management\"}\n" + + "{\"employee_id\":6,\"full_name\":\"Roberta Damstra\",\"first_name\":\"Roberta\",\"last_name\":\"Damstra\",\"position_id\":3,\"position_title\":\"VP Information Systems\",\"store_id\":0,\"department_id\":2,\"birth_date\":\"1942-10-08\",\"hire_date\":\"1994-12-01 00:00:00.0\",\"end_date\":null,\"salary\":25000.0000,\"supervisor_id\":1,\"education_level\":\"Bachelors Degree\",\"marital_status\":\"M\",\"gender\":\"F\",\"management_role\":\"Senior Management\"}\n" + + "{\"employee_id\":7,\"full_name\":\"Rebecca Kanagaki\",\"first_name\":\"Rebecca\",\"last_name\":\"Kanagaki\",\"position_id\":4,\"position_title\":\"VP Human Resources\",\"store_id\":0,\"department_id\":3,\"birth_date\":\"1949-03-27\",\"hire_date\":\"1994-12-01 00:00:00.0\",\"end_date\":null,\"salary\":15000.0000,\"supervisor_id\":1,\"education_level\":\"Bachelors Degree\",\"marital_status\":\"M\",\"gender\":\"F\",\"management_role\":\"Senior Management\"}\n"); + } catch (IOException e) { + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/protocol/src/main/java/org/apache/drill/exec/proto/CoordinationProtos.java ---------------------------------------------------------------------- diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/CoordinationProtos.java b/protocol/src/main/java/org/apache/drill/exec/proto/CoordinationProtos.java index 4fa28df..2199085 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/CoordinationProtos.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/CoordinationProtos.java @@ -101,6 +101,16 @@ public final class CoordinationProtos { */ com.google.protobuf.ByteString getVersionBytes(); + + // optional .exec.DrillbitEndpoint.State state = 7; + /** + * <code>optional .exec.DrillbitEndpoint.State state = 7;</code> + */ + boolean hasState(); + /** + * <code>optional .exec.DrillbitEndpoint.State state = 7;</code> + */ + org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State getState(); } /** * Protobuf type {@code exec.DrillbitEndpoint} @@ -191,6 +201,17 @@ public final class CoordinationProtos { version_ = input.readBytes(); break; } + case 56: { + int rawValue = input.readEnum(); + org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State value = org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State.valueOf(rawValue); + if (value == null) { + unknownFields.mergeVarintField(7, rawValue); + } else { + bitField0_ |= 0x00000040; + state_ = value; + } + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -230,6 +251,106 @@ public final class CoordinationProtos { return PARSER; } + /** + * Protobuf enum {@code exec.DrillbitEndpoint.State} + */ + public enum State + implements com.google.protobuf.ProtocolMessageEnum { + /** + * <code>STARTUP = 0;</code> + */ + STARTUP(0, 0), + /** + * <code>ONLINE = 1;</code> + */ + ONLINE(1, 1), + /** + * <code>QUIESCENT = 2;</code> + */ + QUIESCENT(2, 2), + /** + * <code>OFFLINE = 3;</code> + */ + OFFLINE(3, 3), + ; + + /** + * <code>STARTUP = 0;</code> + */ + public static final int STARTUP_VALUE = 0; + /** + * <code>ONLINE = 1;</code> + */ + public static final int ONLINE_VALUE = 1; + /** + * <code>QUIESCENT = 2;</code> + */ + public static final int QUIESCENT_VALUE = 2; + /** + * <code>OFFLINE = 3;</code> + */ + public static final int OFFLINE_VALUE = 3; + + + public final int getNumber() { return value; } + + public static State valueOf(int value) { + switch (value) { + case 0: return STARTUP; + case 1: return ONLINE; + case 2: return QUIESCENT; + case 3: return OFFLINE; + default: return null; + } + } + + public static com.google.protobuf.Internal.EnumLiteMap<State> + internalGetValueMap() { + return internalValueMap; + } + private static com.google.protobuf.Internal.EnumLiteMap<State> + internalValueMap = + new com.google.protobuf.Internal.EnumLiteMap<State>() { + public State findValueByNumber(int number) { + return State.valueOf(number); + } + }; + + public final com.google.protobuf.Descriptors.EnumValueDescriptor + getValueDescriptor() { + return getDescriptor().getValues().get(index); + } + public final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptorForType() { + return getDescriptor(); + } + public static final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptor() { + return org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.getDescriptor().getEnumTypes().get(0); + } + + private static final State[] VALUES = values(); + + public static State valueOf( + com.google.protobuf.Descriptors.EnumValueDescriptor desc) { + if (desc.getType() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "EnumValueDescriptor is not for this type."); + } + return VALUES[desc.getIndex()]; + } + + private final int index; + private final int value; + + private State(int index, int value) { + this.index = index; + this.value = value; + } + + // @@protoc_insertion_point(enum_scope:exec.DrillbitEndpoint.State) + } + private int bitField0_; // optional string address = 1; public static final int ADDRESS_FIELD_NUMBER = 1; @@ -387,6 +508,22 @@ public final class CoordinationProtos { } } + // optional .exec.DrillbitEndpoint.State state = 7; + public static final int STATE_FIELD_NUMBER = 7; + private org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State state_; + /** + * <code>optional .exec.DrillbitEndpoint.State state = 7;</code> + */ + public boolean hasState() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * <code>optional .exec.DrillbitEndpoint.State state = 7;</code> + */ + public org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State getState() { + return state_; + } + private void initFields() { address_ = ""; userPort_ = 0; @@ -394,6 +531,7 @@ public final class CoordinationProtos { dataPort_ = 0; roles_ = org.apache.drill.exec.proto.CoordinationProtos.Roles.getDefaultInstance(); version_ = ""; + state_ = org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State.STARTUP; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -425,6 +563,9 @@ public final class CoordinationProtos { if (((bitField0_ & 0x00000020) == 0x00000020)) { output.writeBytes(6, getVersionBytes()); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + output.writeEnum(7, state_.getNumber()); + } getUnknownFields().writeTo(output); } @@ -458,6 +599,10 @@ public final class CoordinationProtos { size += com.google.protobuf.CodedOutputStream .computeBytesSize(6, getVersionBytes()); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + size += com.google.protobuf.CodedOutputStream + .computeEnumSize(7, state_.getNumber()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -591,6 +736,8 @@ public final class CoordinationProtos { bitField0_ = (bitField0_ & ~0x00000010); version_ = ""; bitField0_ = (bitField0_ & ~0x00000020); + state_ = org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State.STARTUP; + bitField0_ = (bitField0_ & ~0x00000040); return this; } @@ -647,6 +794,10 @@ public final class CoordinationProtos { to_bitField0_ |= 0x00000020; } result.version_ = version_; + if (((from_bitField0_ & 0x00000040) == 0x00000040)) { + to_bitField0_ |= 0x00000040; + } + result.state_ = state_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -685,6 +836,9 @@ public final class CoordinationProtos { version_ = other.version_; onChanged(); } + if (other.hasState()) { + setState(other.getState()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -1076,6 +1230,42 @@ public final class CoordinationProtos { return this; } + // optional .exec.DrillbitEndpoint.State state = 7; + private org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State state_ = org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State.STARTUP; + /** + * <code>optional .exec.DrillbitEndpoint.State state = 7;</code> + */ + public boolean hasState() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * <code>optional .exec.DrillbitEndpoint.State state = 7;</code> + */ + public org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State getState() { + return state_; + } + /** + * <code>optional .exec.DrillbitEndpoint.State state = 7;</code> + */ + public Builder setState(org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000040; + state_ = value; + onChanged(); + return this; + } + /** + * <code>optional .exec.DrillbitEndpoint.State state = 7;</code> + */ + public Builder clearState() { + bitField0_ = (bitField0_ & ~0x00000040); + state_ = org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State.STARTUP; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:exec.DrillbitEndpoint) } @@ -2575,19 +2765,21 @@ public final class CoordinationProtos { descriptor; static { java.lang.String[] descriptorData = { - "\n\022Coordination.proto\022\004exec\"\214\001\n\020DrillbitE" + + "\n\022Coordination.proto\022\004exec\"\367\001\n\020DrillbitE" + "ndpoint\022\017\n\007address\030\001 \001(\t\022\021\n\tuser_port\030\002 " + "\001(\005\022\024\n\014control_port\030\003 \001(\005\022\021\n\tdata_port\030\004" + " \001(\005\022\032\n\005roles\030\005 \001(\0132\013.exec.Roles\022\017\n\007vers" + - "ion\030\006 \001(\t\"i\n\024DrillServiceInstance\022\n\n\002id\030" + - "\001 \001(\t\022\033\n\023registrationTimeUTC\030\002 \001(\003\022(\n\010en" + - "dpoint\030\003 \001(\0132\026.exec.DrillbitEndpoint\"\227\001\n" + - "\005Roles\022\027\n\tsql_query\030\001 \001(\010:\004true\022\032\n\014logic" + - "al_plan\030\002 \001(\010:\004true\022\033\n\rphysical_plan\030\003 \001" + - "(\010:\004true\022\033\n\rjava_executor\030\004 \001(\010:\004true\022\037\n", - "\021distributed_cache\030\005 \001(\010:\004trueB3\n\033org.ap" + - "ache.drill.exec.protoB\022CoordinationProto" + - "sH\001" + "ion\030\006 \001(\t\022+\n\005state\030\007 \001(\0162\034.exec.Drillbit" + + "Endpoint.State\"<\n\005State\022\013\n\007STARTUP\020\000\022\n\n\006" + + "ONLINE\020\001\022\r\n\tQUIESCENT\020\002\022\013\n\007OFFLINE\020\003\"i\n\024" + + "DrillServiceInstance\022\n\n\002id\030\001 \001(\t\022\033\n\023regi" + + "strationTimeUTC\030\002 \001(\003\022(\n\010endpoint\030\003 \001(\0132" + + "\026.exec.DrillbitEndpoint\"\227\001\n\005Roles\022\027\n\tsql", + "_query\030\001 \001(\010:\004true\022\032\n\014logical_plan\030\002 \001(\010" + + ":\004true\022\033\n\rphysical_plan\030\003 \001(\010:\004true\022\033\n\rj" + + "ava_executor\030\004 \001(\010:\004true\022\037\n\021distributed_" + + "cache\030\005 \001(\010:\004trueB3\n\033org.apache.drill.ex" + + "ec.protoB\022CoordinationProtosH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -2599,7 +2791,7 @@ public final class CoordinationProtos { internal_static_exec_DrillbitEndpoint_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_exec_DrillbitEndpoint_descriptor, - new java.lang.String[] { "Address", "UserPort", "ControlPort", "DataPort", "Roles", "Version", }); + new java.lang.String[] { "Address", "UserPort", "ControlPort", "DataPort", "Roles", "Version", "State", }); internal_static_exec_DrillServiceInstance_descriptor = getDescriptor().getMessageTypes().get(1); internal_static_exec_DrillServiceInstance_fieldAccessorTable = new http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaCoordinationProtos.java ---------------------------------------------------------------------- diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaCoordinationProtos.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaCoordinationProtos.java index a7d83e4..34b91d6 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaCoordinationProtos.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaCoordinationProtos.java @@ -48,6 +48,8 @@ public final class SchemaCoordinationProtos if(message.hasVersion()) output.writeString(6, message.getVersion(), false); + if(message.hasState()) + output.writeEnum(7, message.getState().getNumber(), false); } public boolean isInitialized(org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint message) { @@ -106,6 +108,9 @@ public final class SchemaCoordinationProtos case 6: builder.setVersion(input.readString()); break; + case 7: + builder.setState(org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State.valueOf(input.readEnum())); + break; default: input.handleUnknownField(number, this); } @@ -152,6 +157,7 @@ public final class SchemaCoordinationProtos case 4: return "dataPort"; case 5: return "roles"; case 6: return "version"; + case 7: return "state"; default: return null; } } @@ -169,6 +175,7 @@ public final class SchemaCoordinationProtos fieldMap.put("dataPort", 4); fieldMap.put("roles", 5); fieldMap.put("version", 6); + fieldMap.put("state", 7); } } http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/protocol/src/main/java/org/apache/drill/exec/proto/beans/DrillbitEndpoint.java ---------------------------------------------------------------------- diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/DrillbitEndpoint.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/DrillbitEndpoint.java index 2257763..442c765 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/DrillbitEndpoint.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/DrillbitEndpoint.java @@ -33,6 +33,38 @@ import com.dyuproject.protostuff.Schema; public final class DrillbitEndpoint implements Externalizable, Message<DrillbitEndpoint>, Schema<DrillbitEndpoint> { + public enum State implements com.dyuproject.protostuff.EnumLite<State> + { + STARTUP(0), + ONLINE(1), + QUIESCENT(2), + OFFLINE(3); + + public final int number; + + private State (int number) + { + this.number = number; + } + + public int getNumber() + { + return number; + } + + public static State valueOf(int number) + { + switch(number) + { + case 0: return STARTUP; + case 1: return ONLINE; + case 2: return QUIESCENT; + case 3: return OFFLINE; + default: return null; + } + } + } + public static Schema<DrillbitEndpoint> getSchema() { @@ -53,6 +85,7 @@ public final class DrillbitEndpoint implements Externalizable, Message<DrillbitE private int dataPort; private Roles roles; private String version; + private State state; public DrillbitEndpoint() { @@ -139,6 +172,19 @@ public final class DrillbitEndpoint implements Externalizable, Message<DrillbitE return this; } + // state + + public State getState() + { + return state == null ? State.STARTUP : state; + } + + public DrillbitEndpoint setState(State state) + { + this.state = state; + return this; + } + // java serialization public void readExternal(ObjectInput in) throws IOException @@ -212,6 +258,9 @@ public final class DrillbitEndpoint implements Externalizable, Message<DrillbitE case 6: message.version = input.readString(); break; + case 7: + message.state = State.valueOf(input.readEnum()); + break; default: input.handleUnknownField(number, this); } @@ -239,6 +288,9 @@ public final class DrillbitEndpoint implements Externalizable, Message<DrillbitE if(message.version != null) output.writeString(6, message.version, false); + + if(message.state != null) + output.writeEnum(7, message.state.number, false); } public String getFieldName(int number) @@ -251,6 +303,7 @@ public final class DrillbitEndpoint implements Externalizable, Message<DrillbitE case 4: return "dataPort"; case 5: return "roles"; case 6: return "version"; + case 7: return "state"; default: return null; } } @@ -270,6 +323,7 @@ public final class DrillbitEndpoint implements Externalizable, Message<DrillbitE __fieldMap.put("dataPort", 4); __fieldMap.put("roles", 5); __fieldMap.put("version", 6); + __fieldMap.put("state", 7); } } http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/protocol/src/main/protobuf/Coordination.proto ---------------------------------------------------------------------- diff --git a/protocol/src/main/protobuf/Coordination.proto b/protocol/src/main/protobuf/Coordination.proto index 3f15cf9..0bbb223 100644 --- a/protocol/src/main/protobuf/Coordination.proto +++ b/protocol/src/main/protobuf/Coordination.proto @@ -11,6 +11,13 @@ message DrillbitEndpoint{ optional int32 data_port = 4; optional Roles roles = 5; optional string version = 6; + enum State { + STARTUP = 0; + ONLINE = 1; + QUIESCENT = 2; + OFFLINE = 3; + } + optional State state = 7; } message DrillServiceInstance{