ACCUMULO-3423 respond to elserj's review
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c8f3b7d3 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c8f3b7d3 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c8f3b7d3 Branch: refs/heads/master Commit: c8f3b7d3b8591ea2330437549b9578ae9feebaaf Parents: b2539fb Author: Eric C. Newton <eric.new...@gmail.com> Authored: Thu Feb 26 15:17:04 2015 -0500 Committer: Eric C. Newton <eric.new...@gmail.com> Committed: Thu Feb 26 15:17:04 2015 -0500 ---------------------------------------------------------------------- .../accumulo/core/replication/StatusUtil.java | 9 + .../thrift/TabletClientService.java | 749 ++++++++++++++++++- core/src/main/thrift/tabletserver.thrift | 1 + .../server/master/state/MetaDataStateStore.java | 3 + .../master/state/MetaDataTableScanner.java | 7 +- .../server/master/state/TabletStateStore.java | 3 + .../master/state/ZooTabletStateStore.java | 1 + .../accumulo/server/util/MetadataTableUtil.java | 7 +- .../gc/GarbageCollectWriteAheadLogs.java | 4 +- .../accumulo/master/replication/WorkMaker.java | 2 +- .../apache/accumulo/tserver/TabletServer.java | 5 + .../tserver/log/TabletServerLogger.java | 42 +- .../apache/accumulo/tserver/tablet/Tablet.java | 1 + .../test/performance/thrift/NullTserver.java | 3 + 14 files changed, 793 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java b/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java index a7cd3f5..d8ec403 100644 --- a/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java @@ -32,6 +32,7 @@ public class StatusUtil { private static final Value INF_END_REPLICATION_STATUS_VALUE, CLOSED_STATUS_VALUE; private static final Status.Builder CREATED_STATUS_BUILDER; + private static final Status.Builder INF_END_REPLICATION_STATUS_BUILDER; static { CREATED_STATUS_BUILDER = Status.newBuilder(); @@ -45,6 +46,7 @@ public class StatusUtil { builder.setEnd(0); builder.setInfiniteEnd(true); builder.setClosed(false); + INF_END_REPLICATION_STATUS_BUILDER = builder; INF_END_REPLICATION_STATUS = builder.build(); INF_END_REPLICATION_STATUS_VALUE = ProtobufUtil.toValue(INF_END_REPLICATION_STATUS); @@ -153,6 +155,13 @@ public class StatusUtil { /** * @return A {@link Status} for an open file of unspecified length, all of which needs replicating. */ + public static Status openWithUnknownLength(long timeCreated) { + return INF_END_REPLICATION_STATUS_BUILDER.setCreatedTime(timeCreated).build(); + } + + /** + * @return A {@link Status} for an open file of unspecified length, all of which needs replicating. + */ public static Status openWithUnknownLength() { return INF_END_REPLICATION_STATUS; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java index d6d4afd..02bd4e1 100644 --- a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java +++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java @@ -110,6 +110,8 @@ import org.slf4j.LoggerFactory; public List<ActiveCompaction> getActiveCompactions(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, org.apache.thrift.TException; + public void removeLogs(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, List<String> filenames) throws org.apache.thrift.TException; + public List<String> getActiveLogs(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws org.apache.thrift.TException; } @@ -174,6 +176,8 @@ import org.slf4j.LoggerFactory; public void getActiveCompactions(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; + public void removeLogs(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, List<String> filenames, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; + public void getActiveLogs(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; } @@ -896,6 +900,20 @@ import org.slf4j.LoggerFactory; throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "getActiveCompactions failed: unknown result"); } + public void removeLogs(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, List<String> filenames) throws org.apache.thrift.TException + { + send_removeLogs(tinfo, credentials, filenames); + } + + public void send_removeLogs(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, List<String> filenames) throws org.apache.thrift.TException + { + removeLogs_args args = new removeLogs_args(); + args.setTinfo(tinfo); + args.setCredentials(credentials); + args.setFilenames(filenames); + sendBase("removeLogs", args); + } + public List<String> getActiveLogs(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws org.apache.thrift.TException { send_getActiveLogs(tinfo, credentials); @@ -2099,6 +2117,43 @@ import org.slf4j.LoggerFactory; } } + public void removeLogs(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, List<String> filenames, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { + checkReady(); + removeLogs_call method_call = new removeLogs_call(tinfo, credentials, filenames, resultHandler, this, ___protocolFactory, ___transport); + this.___currentMethod = method_call; + ___manager.call(method_call); + } + + public static class removeLogs_call extends org.apache.thrift.async.TAsyncMethodCall { + private org.apache.accumulo.core.trace.thrift.TInfo tinfo; + private org.apache.accumulo.core.security.thrift.TCredentials credentials; + private List<String> filenames; + public removeLogs_call(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, List<String> filenames, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { + super(client, protocolFactory, transport, resultHandler, true); + this.tinfo = tinfo; + this.credentials = credentials; + this.filenames = filenames; + } + + public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { + prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("removeLogs", org.apache.thrift.protocol.TMessageType.CALL, 0)); + removeLogs_args args = new removeLogs_args(); + args.setTinfo(tinfo); + args.setCredentials(credentials); + args.setFilenames(filenames); + args.write(prot); + prot.writeMessageEnd(); + } + + public void getResult() throws org.apache.thrift.TException { + if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) { + throw new IllegalStateException("Method call not finished!"); + } + org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array()); + org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport); + } + } + public void getActiveLogs(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { checkReady(); getActiveLogs_call method_call = new getActiveLogs_call(tinfo, credentials, resultHandler, this, ___protocolFactory, ___transport); @@ -2176,6 +2231,7 @@ import org.slf4j.LoggerFactory; processMap.put("fastHalt", new fastHalt()); processMap.put("getActiveScans", new getActiveScans()); processMap.put("getActiveCompactions", new getActiveCompactions()); + processMap.put("removeLogs", new removeLogs()); processMap.put("getActiveLogs", new getActiveLogs()); return processMap; } @@ -2837,6 +2893,25 @@ import org.slf4j.LoggerFactory; } } + public static class removeLogs<I extends Iface> extends org.apache.thrift.ProcessFunction<I, removeLogs_args> { + public removeLogs() { + super("removeLogs"); + } + + public removeLogs_args getEmptyArgsInstance() { + return new removeLogs_args(); + } + + protected boolean isOneway() { + return true; + } + + public org.apache.thrift.TBase getResult(I iface, removeLogs_args args) throws org.apache.thrift.TException { + iface.removeLogs(args.tinfo, args.credentials, args.filenames); + return null; + } + } + public static class getActiveLogs<I extends Iface> extends org.apache.thrift.ProcessFunction<I, getActiveLogs_args> { public getActiveLogs() { super("getActiveLogs"); @@ -2899,6 +2974,7 @@ import org.slf4j.LoggerFactory; processMap.put("fastHalt", new fastHalt()); processMap.put("getActiveScans", new getActiveScans()); processMap.put("getActiveCompactions", new getActiveCompactions()); + processMap.put("removeLogs", new removeLogs()); processMap.put("getActiveLogs", new getActiveLogs()); return processMap; } @@ -4291,6 +4367,34 @@ import org.slf4j.LoggerFactory; } } + public static class removeLogs<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, removeLogs_args, Void> { + public removeLogs() { + super("removeLogs"); + } + + public removeLogs_args getEmptyArgsInstance() { + return new removeLogs_args(); + } + + public AsyncMethodCallback<Void> getResultHandler(final AsyncFrameBuffer fb, final int seqid) { + final org.apache.thrift.AsyncProcessFunction fcall = this; + return new AsyncMethodCallback<Void>() { + public void onComplete(Void o) { + } + public void onError(Exception e) { + } + }; + } + + protected boolean isOneway() { + return true; + } + + public void start(I iface, removeLogs_args args, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws TException { + iface.removeLogs(args.tinfo, args.credentials, args.filenames,resultHandler); + } + } + public static class getActiveLogs<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getActiveLogs_args, List<String>> { public getActiveLogs() { super("getActiveLogs"); @@ -33069,6 +33173,619 @@ import org.slf4j.LoggerFactory; } + public static class removeLogs_args implements org.apache.thrift.TBase<removeLogs_args, removeLogs_args._Fields>, java.io.Serializable, Cloneable, Comparable<removeLogs_args> { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("removeLogs_args"); + + private static final org.apache.thrift.protocol.TField TINFO_FIELD_DESC = new org.apache.thrift.protocol.TField("tinfo", org.apache.thrift.protocol.TType.STRUCT, (short)1); + private static final org.apache.thrift.protocol.TField CREDENTIALS_FIELD_DESC = new org.apache.thrift.protocol.TField("credentials", org.apache.thrift.protocol.TType.STRUCT, (short)2); + private static final org.apache.thrift.protocol.TField FILENAMES_FIELD_DESC = new org.apache.thrift.protocol.TField("filenames", org.apache.thrift.protocol.TType.LIST, (short)3); + + private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); + static { + schemes.put(StandardScheme.class, new removeLogs_argsStandardSchemeFactory()); + schemes.put(TupleScheme.class, new removeLogs_argsTupleSchemeFactory()); + } + + public org.apache.accumulo.core.trace.thrift.TInfo tinfo; // required + public org.apache.accumulo.core.security.thrift.TCredentials credentials; // required + public List<String> filenames; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + TINFO((short)1, "tinfo"), + CREDENTIALS((short)2, "credentials"), + FILENAMES((short)3, "filenames"); + + private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // TINFO + return TINFO; + case 2: // CREDENTIALS + return CREDENTIALS; + case 3: // FILENAMES + return FILENAMES; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.TINFO, new org.apache.thrift.meta_data.FieldMetaData("tinfo", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.trace.thrift.TInfo.class))); + tmpMap.put(_Fields.CREDENTIALS, new org.apache.thrift.meta_data.FieldMetaData("credentials", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.security.thrift.TCredentials.class))); + tmpMap.put(_Fields.FILENAMES, new org.apache.thrift.meta_data.FieldMetaData("filenames", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(removeLogs_args.class, metaDataMap); + } + + public removeLogs_args() { + } + + public removeLogs_args( + org.apache.accumulo.core.trace.thrift.TInfo tinfo, + org.apache.accumulo.core.security.thrift.TCredentials credentials, + List<String> filenames) + { + this(); + this.tinfo = tinfo; + this.credentials = credentials; + this.filenames = filenames; + } + + /** + * Performs a deep copy on <i>other</i>. + */ + public removeLogs_args(removeLogs_args other) { + if (other.isSetTinfo()) { + this.tinfo = new org.apache.accumulo.core.trace.thrift.TInfo(other.tinfo); + } + if (other.isSetCredentials()) { + this.credentials = new org.apache.accumulo.core.security.thrift.TCredentials(other.credentials); + } + if (other.isSetFilenames()) { + List<String> __this__filenames = new ArrayList<String>(other.filenames); + this.filenames = __this__filenames; + } + } + + public removeLogs_args deepCopy() { + return new removeLogs_args(this); + } + + @Override + public void clear() { + this.tinfo = null; + this.credentials = null; + this.filenames = null; + } + + public org.apache.accumulo.core.trace.thrift.TInfo getTinfo() { + return this.tinfo; + } + + public removeLogs_args setTinfo(org.apache.accumulo.core.trace.thrift.TInfo tinfo) { + this.tinfo = tinfo; + return this; + } + + public void unsetTinfo() { + this.tinfo = null; + } + + /** Returns true if field tinfo is set (has been assigned a value) and false otherwise */ + public boolean isSetTinfo() { + return this.tinfo != null; + } + + public void setTinfoIsSet(boolean value) { + if (!value) { + this.tinfo = null; + } + } + + public org.apache.accumulo.core.security.thrift.TCredentials getCredentials() { + return this.credentials; + } + + public removeLogs_args setCredentials(org.apache.accumulo.core.security.thrift.TCredentials credentials) { + this.credentials = credentials; + return this; + } + + public void unsetCredentials() { + this.credentials = null; + } + + /** Returns true if field credentials is set (has been assigned a value) and false otherwise */ + public boolean isSetCredentials() { + return this.credentials != null; + } + + public void setCredentialsIsSet(boolean value) { + if (!value) { + this.credentials = null; + } + } + + public int getFilenamesSize() { + return (this.filenames == null) ? 0 : this.filenames.size(); + } + + public java.util.Iterator<String> getFilenamesIterator() { + return (this.filenames == null) ? null : this.filenames.iterator(); + } + + public void addToFilenames(String elem) { + if (this.filenames == null) { + this.filenames = new ArrayList<String>(); + } + this.filenames.add(elem); + } + + public List<String> getFilenames() { + return this.filenames; + } + + public removeLogs_args setFilenames(List<String> filenames) { + this.filenames = filenames; + return this; + } + + public void unsetFilenames() { + this.filenames = null; + } + + /** Returns true if field filenames is set (has been assigned a value) and false otherwise */ + public boolean isSetFilenames() { + return this.filenames != null; + } + + public void setFilenamesIsSet(boolean value) { + if (!value) { + this.filenames = null; + } + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case TINFO: + if (value == null) { + unsetTinfo(); + } else { + setTinfo((org.apache.accumulo.core.trace.thrift.TInfo)value); + } + break; + + case CREDENTIALS: + if (value == null) { + unsetCredentials(); + } else { + setCredentials((org.apache.accumulo.core.security.thrift.TCredentials)value); + } + break; + + case FILENAMES: + if (value == null) { + unsetFilenames(); + } else { + setFilenames((List<String>)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case TINFO: + return getTinfo(); + + case CREDENTIALS: + return getCredentials(); + + case FILENAMES: + return getFilenames(); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case TINFO: + return isSetTinfo(); + case CREDENTIALS: + return isSetCredentials(); + case FILENAMES: + return isSetFilenames(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof removeLogs_args) + return this.equals((removeLogs_args)that); + return false; + } + + public boolean equals(removeLogs_args that) { + if (that == null) + return false; + + boolean this_present_tinfo = true && this.isSetTinfo(); + boolean that_present_tinfo = true && that.isSetTinfo(); + if (this_present_tinfo || that_present_tinfo) { + if (!(this_present_tinfo && that_present_tinfo)) + return false; + if (!this.tinfo.equals(that.tinfo)) + return false; + } + + boolean this_present_credentials = true && this.isSetCredentials(); + boolean that_present_credentials = true && that.isSetCredentials(); + if (this_present_credentials || that_present_credentials) { + if (!(this_present_credentials && that_present_credentials)) + return false; + if (!this.credentials.equals(that.credentials)) + return false; + } + + boolean this_present_filenames = true && this.isSetFilenames(); + boolean that_present_filenames = true && that.isSetFilenames(); + if (this_present_filenames || that_present_filenames) { + if (!(this_present_filenames && that_present_filenames)) + return false; + if (!this.filenames.equals(that.filenames)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public int compareTo(removeLogs_args other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = Boolean.valueOf(isSetTinfo()).compareTo(other.isSetTinfo()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetTinfo()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.tinfo, other.tinfo); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetCredentials()).compareTo(other.isSetCredentials()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetCredentials()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.credentials, other.credentials); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetFilenames()).compareTo(other.isSetFilenames()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetFilenames()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.filenames, other.filenames); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + schemes.get(iprot.getScheme()).getScheme().read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + schemes.get(oprot.getScheme()).getScheme().write(oprot, this); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("removeLogs_args("); + boolean first = true; + + sb.append("tinfo:"); + if (this.tinfo == null) { + sb.append("null"); + } else { + sb.append(this.tinfo); + } + first = false; + if (!first) sb.append(", "); + sb.append("credentials:"); + if (this.credentials == null) { + sb.append("null"); + } else { + sb.append(this.credentials); + } + first = false; + if (!first) sb.append(", "); + sb.append("filenames:"); + if (this.filenames == null) { + sb.append("null"); + } else { + sb.append(this.filenames); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + if (tinfo != null) { + tinfo.validate(); + } + if (credentials != null) { + credentials.validate(); + } + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class removeLogs_argsStandardSchemeFactory implements SchemeFactory { + public removeLogs_argsStandardScheme getScheme() { + return new removeLogs_argsStandardScheme(); + } + } + + private static class removeLogs_argsStandardScheme extends StandardScheme<removeLogs_args> { + + public void read(org.apache.thrift.protocol.TProtocol iprot, removeLogs_args struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // TINFO + if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { + struct.tinfo = new org.apache.accumulo.core.trace.thrift.TInfo(); + struct.tinfo.read(iprot); + struct.setTinfoIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 2: // CREDENTIALS + if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { + struct.credentials = new org.apache.accumulo.core.security.thrift.TCredentials(); + struct.credentials.read(iprot); + struct.setCredentialsIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 3: // FILENAMES + if (schemeField.type == org.apache.thrift.protocol.TType.LIST) { + { + org.apache.thrift.protocol.TList _list314 = iprot.readListBegin(); + struct.filenames = new ArrayList<String>(_list314.size); + for (int _i315 = 0; _i315 < _list314.size; ++_i315) + { + String _elem316; + _elem316 = iprot.readString(); + struct.filenames.add(_elem316); + } + iprot.readListEnd(); + } + struct.setFilenamesIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, removeLogs_args struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.tinfo != null) { + oprot.writeFieldBegin(TINFO_FIELD_DESC); + struct.tinfo.write(oprot); + oprot.writeFieldEnd(); + } + if (struct.credentials != null) { + oprot.writeFieldBegin(CREDENTIALS_FIELD_DESC); + struct.credentials.write(oprot); + oprot.writeFieldEnd(); + } + if (struct.filenames != null) { + oprot.writeFieldBegin(FILENAMES_FIELD_DESC); + { + oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, struct.filenames.size())); + for (String _iter317 : struct.filenames) + { + oprot.writeString(_iter317); + } + oprot.writeListEnd(); + } + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class removeLogs_argsTupleSchemeFactory implements SchemeFactory { + public removeLogs_argsTupleScheme getScheme() { + return new removeLogs_argsTupleScheme(); + } + } + + private static class removeLogs_argsTupleScheme extends TupleScheme<removeLogs_args> { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, removeLogs_args struct) throws org.apache.thrift.TException { + TTupleProtocol oprot = (TTupleProtocol) prot; + BitSet optionals = new BitSet(); + if (struct.isSetTinfo()) { + optionals.set(0); + } + if (struct.isSetCredentials()) { + optionals.set(1); + } + if (struct.isSetFilenames()) { + optionals.set(2); + } + oprot.writeBitSet(optionals, 3); + if (struct.isSetTinfo()) { + struct.tinfo.write(oprot); + } + if (struct.isSetCredentials()) { + struct.credentials.write(oprot); + } + if (struct.isSetFilenames()) { + { + oprot.writeI32(struct.filenames.size()); + for (String _iter318 : struct.filenames) + { + oprot.writeString(_iter318); + } + } + } + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, removeLogs_args struct) throws org.apache.thrift.TException { + TTupleProtocol iprot = (TTupleProtocol) prot; + BitSet incoming = iprot.readBitSet(3); + if (incoming.get(0)) { + struct.tinfo = new org.apache.accumulo.core.trace.thrift.TInfo(); + struct.tinfo.read(iprot); + struct.setTinfoIsSet(true); + } + if (incoming.get(1)) { + struct.credentials = new org.apache.accumulo.core.security.thrift.TCredentials(); + struct.credentials.read(iprot); + struct.setCredentialsIsSet(true); + } + if (incoming.get(2)) { + { + org.apache.thrift.protocol.TList _list319 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32()); + struct.filenames = new ArrayList<String>(_list319.size); + for (int _i320 = 0; _i320 < _list319.size; ++_i320) + { + String _elem321; + _elem321 = iprot.readString(); + struct.filenames.add(_elem321); + } + } + struct.setFilenamesIsSet(true); + } + } + } + + } + public static class getActiveLogs_args implements org.apache.thrift.TBase<getActiveLogs_args, getActiveLogs_args._Fields>, java.io.Serializable, Cloneable, Comparable<getActiveLogs_args> { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("getActiveLogs_args"); @@ -33839,13 +34556,13 @@ import org.slf4j.LoggerFactory; case 0: // SUCCESS if (schemeField.type == org.apache.thrift.protocol.TType.LIST) { { - org.apache.thrift.protocol.TList _list314 = iprot.readListBegin(); - struct.success = new ArrayList<String>(_list314.size); - for (int _i315 = 0; _i315 < _list314.size; ++_i315) + org.apache.thrift.protocol.TList _list322 = iprot.readListBegin(); + struct.success = new ArrayList<String>(_list322.size); + for (int _i323 = 0; _i323 < _list322.size; ++_i323) { - String _elem316; - _elem316 = iprot.readString(); - struct.success.add(_elem316); + String _elem324; + _elem324 = iprot.readString(); + struct.success.add(_elem324); } iprot.readListEnd(); } @@ -33873,9 +34590,9 @@ import org.slf4j.LoggerFactory; oprot.writeFieldBegin(SUCCESS_FIELD_DESC); { oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, struct.success.size())); - for (String _iter317 : struct.success) + for (String _iter325 : struct.success) { - oprot.writeString(_iter317); + oprot.writeString(_iter325); } oprot.writeListEnd(); } @@ -33906,9 +34623,9 @@ import org.slf4j.LoggerFactory; if (struct.isSetSuccess()) { { oprot.writeI32(struct.success.size()); - for (String _iter318 : struct.success) + for (String _iter326 : struct.success) { - oprot.writeString(_iter318); + oprot.writeString(_iter326); } } } @@ -33920,13 +34637,13 @@ import org.slf4j.LoggerFactory; BitSet incoming = iprot.readBitSet(1); if (incoming.get(0)) { { - org.apache.thrift.protocol.TList _list319 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32()); - struct.success = new ArrayList<String>(_list319.size); - for (int _i320 = 0; _i320 < _list319.size; ++_i320) + org.apache.thrift.protocol.TList _list327 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32()); + struct.success = new ArrayList<String>(_list327.size); + for (int _i328 = 0; _i328 < _list327.size; ++_i328) { - String _elem321; - _elem321 = iprot.readString(); - struct.success.add(_elem321); + String _elem329; + _elem329 = iprot.readString(); + struct.success.add(_elem329); } } struct.setSuccessIsSet(true); http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/core/src/main/thrift/tabletserver.thrift ---------------------------------------------------------------------- diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift index f25a08f..4a31036 100644 --- a/core/src/main/thrift/tabletserver.thrift +++ b/core/src/main/thrift/tabletserver.thrift @@ -205,6 +205,7 @@ service TabletClientService extends client.ClientService { list<ActiveScan> getActiveScans(2:trace.TInfo tinfo, 1:security.TCredentials credentials) throws (1:client.ThriftSecurityException sec) list<ActiveCompaction> getActiveCompactions(2:trace.TInfo tinfo, 1:security.TCredentials credentials) throws (1:client.ThriftSecurityException sec) + oneway void removeLogs(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:list<string> filenames) list<string> getActiveLogs(1:trace.TInfo tinfo, 2:security.TCredentials credentials) } http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java index 270bb31..1749904 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java @@ -33,8 +33,10 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.server.AccumuloServerContext; import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; public class MetaDataStateStore extends TabletStateStore { + private static final Logger log = Logger.getLogger(MetaDataStateStore.class); private static final int THREADS = 4; private static final int LATENCY = 1000; @@ -172,6 +174,7 @@ public class MetaDataStateStore extends TabletStateStore { writer.addMutation(m); } } catch (Exception ex) { + log.error("Error marking logs as unused: " + logs); throw new DistributedStoreException(ex); } finally { try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java index dac7fe6..130364b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java @@ -43,9 +43,10 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Lo import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException; import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; public class MetaDataTableScanner implements ClosableIterator<TabletLocationState> { - //private static final Logger log = Logger.getLogger(MetaDataTableScanner.class); + private static final Logger log = Logger.getLogger(MetaDataTableScanner.class); BatchScanner mdScanner = null; Iterator<Entry<Key,Value>> iter = null; @@ -172,7 +173,9 @@ public class MetaDataTableScanner implements ClosableIterator<TabletLocationStat } } if (extent == null) { - throw new BadLocationStateException("No prev-row for key extent " + decodedRow, k.getRow()); + String msg = "No prev-row for key extent " + decodedRow; + log.error(msg); + throw new BadLocationStateException(msg, k.getRow()); } return new TabletLocationState(extent, future, current, last, walogs, chopped); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java index de90d98..13db05b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java @@ -87,6 +87,9 @@ public abstract class TabletStateStore implements Iterable<TabletLocationState> store.setLocations(Collections.singletonList(assignment)); } + /** + * When a server fails, its logs must be marked as unused after the log markers are moved to the tablets. + */ abstract public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance, List<String>> logs) throws DistributedStoreException; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java index a044434..66bad4e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java @@ -197,6 +197,7 @@ public class ZooTabletStateStore extends TabletStateStore { @Override public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance,List<String>> logs) { + // the root table is not replicated, so unassigning the root tablet has removed the current log marker } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java index a95cffa..ebf4b1b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java @@ -89,6 +89,7 @@ import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.accumulo.server.tablets.TabletTime; import org.apache.accumulo.server.zookeeper.ZooLock; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -1067,7 +1068,7 @@ public class MetadataTableUtil { @Override public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException { String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS; - String[] parts = filename.split("/"); + String[] parts = StringUtils.split(filename, '/'); String uniqueId = parts[parts.length - 1]; String path = root + "/" + CurrentLogsSection.getRowPrefix() + tabletSession.toString() + uniqueId; rw.putPersistentData(path, filename.getBytes(UTF_8), NodeExistsPolicy.OVERWRITE); @@ -1075,7 +1076,7 @@ public class MetadataTableUtil { }); } else { Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + tabletSession.toString()); - m.put("log", filename, new Value(EMPTY_BYTES)); + m.put(CurrentLogsSection.COLF, new Text(filename), new Value(EMPTY_BYTES)); String tableName = MetadataTable.NAME; if (extent.isMeta()) { tableName = RootTable.NAME; @@ -1095,7 +1096,7 @@ public class MetadataTableUtil { @Override public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException { String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS; - String[] parts = filename.split("/"); + String[] parts = StringUtils.split(filename, '/'); String uniqueId = parts[parts.length - 1]; String path = root + "/" + CurrentLogsSection.getRowPrefix() + tabletSession.toString() + uniqueId; log.debug("Removing entry " + path + " from zookeeper"); http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index 2561eec..a7703e9 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@ -105,8 +105,8 @@ public class GarbageCollectWriteAheadLogs { LiveTServerSet liveServers = new LiveTServerSet(context, new Listener() { @Override public void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added) { - log.debug("New tablet server noticed: " + added); - log.debug("Tablet server removed: " + deleted); + log.debug("New tablet servers noticed: " + added); + log.debug("Tablet servers removed: " + deleted); } }); Set<TServerInstance> currentServers = liveServers.getCurrentServers(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java index bc4c64f..4490824 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java @@ -107,7 +107,7 @@ public class WorkMaker { // Don't create the record if we have nothing to do. // TODO put this into a filter on serverside if (!shouldCreateWork(status)) { - log.info("Not creating work: " + status.toString()); + log.debug("Not creating work: " + status.toString()); continue; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index b12fccc..3b7ff03 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -1715,6 +1715,11 @@ public class TabletServer extends AccumuloServerContext implements Runnable { String log = logger.getLogFile(); return Collections.singletonList(log); } + + @Override + public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException { + log.warn("Garbage collector is attempting to remove logs through the tablet server"); + } } private class SplitRunner implements Runnable { http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index 6455726..46101c1 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -37,7 +38,9 @@ import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationConfigurationUtil; +import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; +import org.apache.accumulo.core.util.SimpleThreadPool; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.VolumeManager; @@ -73,8 +76,8 @@ public class TabletServerLogger { // The current logger private DfsLogger currentLog = null; - private DfsLogger nextLog = null; - private Thread nextLogThread = null; + private final AtomicReference<DfsLogger> nextLog = new AtomicReference<>(null); + private final ThreadPoolExecutor nextLogMaker = new SimpleThreadPool(1, "WALog creator"); // The current generation of logs. // Because multiple threads can be using a log at one time, a log @@ -194,16 +197,16 @@ public class TabletServerLogger { } try { - if (nextLog != null) { - log.info("Using next log " + nextLog.getFileName()); - currentLog = nextLog; - nextLog = null; + DfsLogger next = nextLog.getAndSet(null); + if (next != null) { + log.info("Using next log " + next.getFileName()); + currentLog = next; } else { DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter); alog.open(tserver.getClientAddressString()); currentLog = alog; } - if (nextLog == null) { + if (nextLog.get() == null) { createNextLog(); } logId.incrementAndGet(); @@ -217,31 +220,30 @@ public class TabletServerLogger { } } + // callers are synchronized already private void createNextLog() { - if (nextLogThread == null) { - nextLogThread = new Thread() { + if (nextLogMaker.getActiveCount() == 0) { + nextLogMaker.submit(new Runnable() { @Override public void run() { try { - log.info("Creating next WAL"); - this.setName("Creating next WAL"); + log.debug("Creating next WAL"); DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter); alog.open(tserver.getClientAddressString()); for (Tablet tablet : tserver.getOnlineTablets()) { - // TODO tserver.addLoggersToMetadata(alog, tablet.getExtent()); } - nextLog = alog; - - log.info("Created next WAL " + alog.getFileName()); + log.debug("Created next WAL " + alog.getFileName()); + alog = nextLog.getAndSet(alog); + if (alog != null) { + log.debug("closing unused next log: " + alog.getFileName()); + alog.close(); + } } catch (Exception t) { log.error(t, t); - } finally { - nextLogThread = null; } } - }; - nextLogThread.start(); + }); } } @@ -317,7 +319,7 @@ public class TabletServerLogger { // Need to release KeyExtent extent = commitSession.getExtent(); if (ReplicationConfigurationUtil.isEnabled(extent, tserver.getTableConfiguration(extent))) { - Status status = Status.newBuilder().setInfiniteEnd(true).setCreatedTime(System.currentTimeMillis()).build(); + Status status = StatusUtil.fileCreated(System.currentTimeMillis()); log.debug("Writing " + ProtobufUtil.toString(status) + " to metadata table for " + copy.getFileName()); // Got some new WALs, note this in the metadata table ReplicationTableUtil.updateFiles(tserver, commitSession.getExtent(), copy.getFileName(), status); http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index fa1ae86..0c1edfa 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -2377,6 +2377,7 @@ public class Tablet implements TabletCommitter { private ConcurrentSkipListSet<DfsLogger> currentLogs = new ConcurrentSkipListSet<DfsLogger>(); + // currentLogs may be updated while a tablet is otherwise locked public Set<DfsLogger> getCurrentLogFiles() { return new HashSet<DfsLogger>(currentLogs); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java index b8a60c1..ac7fd70 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java @@ -228,6 +228,9 @@ public class NullTserver { public List<String> getActiveLogs(TInfo tinfo, TCredentials credentials) throws TException { return null; } + + @Override + public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException { } } static class Opts extends Help {