This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new 3783e62 [NO ISSUE][HYR] Binary compatibility enhancements 3783e62 is described below commit 3783e62c10a929f9f9bc3308622e7d2456dad06c Author: Michael Blow <mb...@apache.org> AuthorDate: Thu Jan 31 00:25:13 2019 -0500 [NO ISSUE][HYR] Binary compatibility enhancements Infrastructure & changes to enable binary compatibility with 0.9.4 Change-Id: I77d4919be4853d9afe9b0137861cff3b1d751e20 Reviewed-on: https://asterix-gerrit.ics.uci.edu/3128 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> --- .../apache/asterix/app/nc/NCAppRuntimeContext.java | 19 ++++-- .../asterix/hyracks/bootstrap/NCApplication.java | 5 +- .../metadata/bootstrap/AsterixStateProxy.java | 17 +++-- .../aggregates/std/SqlSumAggregateDescriptor.java | 13 +++- .../client/{impl => }/ClusterControllerInfo.java | 2 +- .../client}/HyracksClientInterfaceFunctions.java | 2 +- .../api/client/IHyracksClientInterface.java | 1 - .../IJavaSerializationProvider.java} | 28 +++++---- .../org/apache/hyracks/api/config/IOption.java | 10 ++- .../hyracks/api/config/SerializedOption.java | 73 ++++++++++++++++++++++ .../org/apache/hyracks/api/context/ICCContext.java | 2 +- .../hyracks/api/util/JavaSerializationUtils.java | 23 ++++++- .../client/result/ResultDirectoryRemoteProxy.java | 2 +- .../hyracks/control/cc/ClientInterfaceIPCI.java | 2 +- .../control/cc/ClusterControllerService.java | 2 +- .../hyracks/control/cc/cluster/NodeManager.java | 7 ++- .../control/cc/work/GetNodeDetailsJSONWork.java | 6 +- .../hyracks/control/cc/work/RegisterNodeWork.java | 15 +++-- .../control/cc/cluster/NodeManagerTest.java | 6 +- .../hyracks/control/common/NodeControllerData.java | 19 +++--- .../control/common/config/ConfigManager.java | 5 ++ .../hyracks/control/common/config/ConfigUtils.java | 38 ++++++++--- .../control/common/controllers/NCConfig.java | 13 +++- .../control/common/controllers/NodeParameters.java | 2 +- .../common/controllers/NodeRegistration.java | 29 +++++++-- .../impl/HyracksClientInterfaceRemoteProxy.java | 3 +- .../apache/hyracks/ipc/impl/HyracksConnection.java | 2 +- ...lizationBasedPayloadSerializerDeserializer.java | 24 ++++--- .../lsm/btree/dataflow/LSMBTreeLocalResource.java | 13 ++++ .../dataflow/LSMBTreeLocalResourceFactory.java | 13 ++++ .../NoOpCompressorDecompressorFactory.java | 8 ++- .../org/apache/hyracks/util/CompatibilityUtil.java | 58 +++++++++++++++++ 32 files changed, 374 insertions(+), 88 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index d89004b..3e72b7d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -78,8 +78,8 @@ import org.apache.asterix.runtime.utils.NoOpCoordinationService; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory; import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.client.ClusterControllerInfo; import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.api.client.impl.ClusterControllerInfo; import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IIOManager; @@ -435,11 +435,18 @@ public class NCAppRuntimeContext implements INcApplicationContext { if (metadataNodeStub == null) { final INetworkSecurityManager networkSecurityManager = ncServiceContext.getControllerService().getNetworkSecurityManager(); - final RMIServerFactory serverSocketFactory = new RMIServerFactory(networkSecurityManager); - final RMIClientFactory clientSocketFactory = - new RMIClientFactory(networkSecurityManager.getConfiguration().isSslEnabled()); - metadataNodeStub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE, - getMetadataProperties().getMetadataPort(), clientSocketFactory, serverSocketFactory); + + // clients need to have the client factory on their classpath- to enable older clients, only use + // our client socket factory when SSL is enabled + if (networkSecurityManager.getConfiguration().isSslEnabled()) { + final RMIServerFactory serverSocketFactory = new RMIServerFactory(networkSecurityManager); + final RMIClientFactory clientSocketFactory = new RMIClientFactory(true); + metadataNodeStub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE, + getMetadataProperties().getMetadataPort(), clientSocketFactory, serverSocketFactory); + } else { + metadataNodeStub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE, + getMetadataProperties().getMetadataPort()); + } } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index 4fa86ae..97316d2 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -64,7 +64,6 @@ import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IFileDeviceResolver; import org.apache.hyracks.api.job.resource.NodeCapacity; -import org.apache.hyracks.api.messages.IMessageBroker; import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.hyracks.control.nc.BaseNCApplication; import org.apache.hyracks.control.nc.NodeControllerService; @@ -132,10 +131,10 @@ public class NCApplication extends BaseNCApplication { } runtimeContext.initialize(getRecoveryManagerFactory(), runtimeContext.getNodeProperties().isInitialRun()); MessagingProperties messagingProperties = runtimeContext.getMessagingProperties(); - IMessageBroker messageBroker = new NCMessageBroker(controllerService, messagingProperties); + NCMessageBroker messageBroker = new NCMessageBroker(controllerService, messagingProperties); this.ncServiceCtx.setMessageBroker(messageBroker); MessagingChannelInterfaceFactory interfaceFactory = - new MessagingChannelInterfaceFactory((NCMessageBroker) messageBroker, messagingProperties); + new MessagingChannelInterfaceFactory(messageBroker, messagingProperties); this.ncServiceCtx.setMessagingChannelInterfaceFactory(interfaceFactory); final Checkpoint latestCheckpoint = runtimeContext.getTransactionSubsystem().getCheckpointManager().getLatest(); if (latestCheckpoint != null && latestCheckpoint.getStorageVersion() != StorageConstants.VERSION) { diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java index 4c971e2..d6af749 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java @@ -43,11 +43,18 @@ public class AsterixStateProxy implements IAsterixStateProxy { public static IAsterixStateProxy registerRemoteObject(INetworkSecurityManager networkSecurityManager, int metadataCallbackPort) throws RemoteException { - final RMIServerFactory serverSocketFactory = new RMIServerFactory(networkSecurityManager); - final RMIClientFactory clientSocketFactory = - new RMIClientFactory(networkSecurityManager.getConfiguration().isSslEnabled()); - final IAsterixStateProxy stub = (IAsterixStateProxy) UnicastRemoteObject.exportObject(cc, metadataCallbackPort, - clientSocketFactory, serverSocketFactory); + IAsterixStateProxy stub; + // clients need to have the client factory on their classpath- to enable older clients, only use + // our client socket factory when SSL is enabled + if (networkSecurityManager.getConfiguration().isSslEnabled()) { + final RMIServerFactory serverSocketFactory = new RMIServerFactory(networkSecurityManager); + final RMIClientFactory clientSocketFactory = + new RMIClientFactory(networkSecurityManager.getConfiguration().isSslEnabled()); + stub = (IAsterixStateProxy) UnicastRemoteObject.exportObject(cc, metadataCallbackPort, clientSocketFactory, + serverSocketFactory); + } else { + stub = (IAsterixStateProxy) UnicastRemoteObject.exportObject(cc, metadataCallbackPort); + } LOGGER.info("Asterix Distributed State Proxy Bound"); return stub; } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java index 6e73a40..779fba2 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java @@ -19,6 +19,7 @@ package org.apache.asterix.runtime.aggregates.std; import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptor; import org.apache.asterix.om.functions.IFunctionDescriptorFactory; import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; @@ -31,7 +32,17 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public class SqlSumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor { private static final long serialVersionUID = 1L; - public static final IFunctionDescriptorFactory FACTORY = SqlSumAggregateDescriptor::new; + + // this must remain an anonymous inner class due to the evaluator factory below being an anonymous inner + // serializable class, to not break binary compatibility + // this can be reverted once serialization compatibility code is in place to write the correct class + @SuppressWarnings({ "Anonymous2MethodRef", "Convert2Lambda" }) + public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { + @Override + public IFunctionDescriptor createFunctionDescriptor() { + return new SqlSumAggregateDescriptor(); + } + }; @Override public FunctionIdentifier getIdentifier() { diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ClusterControllerInfo.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java similarity index 97% rename from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ClusterControllerInfo.java rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java index 7d0dd61..c0445a7 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ClusterControllerInfo.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.api.client.impl; +package org.apache.hyracks.api.client; import java.io.Serializable; diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java similarity index 99% rename from hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceFunctions.java rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java index a61c96d..72bdc3e 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.ipc.impl; +package org.apache.hyracks.api.client; import java.io.Serializable; import java.net.URL; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java index e92db5e..4cc47d2 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java @@ -23,7 +23,6 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; -import org.apache.hyracks.api.client.impl.ClusterControllerInfo; import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.deployment.DeploymentId; import org.apache.hyracks.api.job.DeployedJobSpecId; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IJavaSerializationProvider.java similarity index 55% copy from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java copy to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IJavaSerializationProvider.java index c3da155..deaa966 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IJavaSerializationProvider.java @@ -16,20 +16,24 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.api.context; +package org.apache.hyracks.api.comm; -import java.net.InetAddress; -import java.util.Map; -import java.util.Set; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; -import org.apache.hyracks.api.client.impl.ClusterControllerInfo; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.topology.ClusterTopology; +public interface IJavaSerializationProvider { + default ObjectOutputStream newObjectOutputStream(OutputStream out) throws IOException { + return new ObjectOutputStream(out); + } -public interface ICCContext { - ClusterControllerInfo getClusterControllerInfo(); + default ObjectInputStream newObjectInputStream(InputStream in) throws IOException { + return new ObjectInputStream(in); + } - void getIPAddressNodeMap(Map<InetAddress, Set<String>> map) throws HyracksDataException; - - ClusterTopology getClusterTopology(); + default void readObject(ObjectInputStream in, Object object) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOption.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOption.java index 5f11214..c66b4fa 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOption.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOption.java @@ -65,7 +65,7 @@ public interface IOption { } default String ini() { - return name().toLowerCase().replace("_", "."); + return toIni(name()); } default String camelCase() { @@ -75,4 +75,12 @@ public interface IOption { default String toIniString() { return "[" + section().sectionName() + "] " + ini(); } + + static String toIni(String name) { + return name.toLowerCase().replace("_", "."); + } + + default SerializedOption toSerializable() { + return new SerializedOption(this); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/SerializedOption.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/SerializedOption.java new file mode 100644 index 0000000..8263af6 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/SerializedOption.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.config; + +import java.io.Serializable; +import java.util.Objects; + +public final class SerializedOption implements Serializable, Comparable { + private static final long serialVersionUID = 1L; + private final String sectionName; + private final String optionName; + + SerializedOption(IOption option) { + this.sectionName = option.section().name(); + this.optionName = option.name(); + } + + public String optionName() { + return optionName; + } + + public Section section() { + return Section.valueOf(sectionName); + } + + @Override + public int compareTo(Object o) { + if (!(o instanceof SerializedOption)) { + return -1; + } + SerializedOption that = (SerializedOption) o; + int sectionComp = sectionName.compareTo(that.sectionName); + return sectionComp != 0 ? sectionComp : optionName.compareTo(that.optionName); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SerializedOption that = (SerializedOption) o; + return Objects.equals(sectionName, that.sectionName) && Objects.equals(optionName, that.optionName); + } + + @Override + public int hashCode() { + return Objects.hash(sectionName, optionName); + } + + @Override + public String toString() { + return "[" + sectionName + "] " + optionName; + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java index c3da155..83e0482 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java @@ -22,7 +22,7 @@ import java.net.InetAddress; import java.util.Map; import java.util.Set; -import org.apache.hyracks.api.client.impl.ClusterControllerInfo; +import org.apache.hyracks.api.client.ClusterControllerInfo; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.topology.ClusterTopology; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java index a92e700..b38d343 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java @@ -29,20 +29,25 @@ import java.io.Serializable; import java.lang.reflect.Modifier; import java.lang.reflect.Proxy; +import org.apache.hyracks.api.comm.IJavaSerializationProvider; + public class JavaSerializationUtils { + private static IJavaSerializationProvider serProvider = new IJavaSerializationProvider() { + }; + public static byte[] serialize(Serializable jobSpec) throws IOException { if (jobSpec instanceof byte[]) { return (byte[]) jobSpec; } ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); + ObjectOutputStream oos = serProvider.newObjectOutputStream(baos); oos.writeObject(jobSpec); return baos.toByteArray(); } public static byte[] serialize(Serializable jobSpec, ClassLoader classLoader) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); + ObjectOutputStream oos = serProvider.newObjectOutputStream(baos); ClassLoader ctxCL = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(classLoader); @@ -57,7 +62,7 @@ public class JavaSerializationUtils { if (bytes == null) { return null; } - ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); + ObjectInputStream ois = serProvider.newObjectInputStream(new ByteArrayInputStream(bytes)); return ois.readObject(); } @@ -78,6 +83,18 @@ public class JavaSerializationUtils { return Class.forName(className); } + public static void setSerializationProvider(IJavaSerializationProvider serProvider) { + JavaSerializationUtils.serProvider = serProvider; + } + + public static IJavaSerializationProvider getSerializationProvider() { + return serProvider; + } + + public static void readObject(ObjectInputStream in, Object object) throws IOException, ClassNotFoundException { + serProvider.readObject(in, object); + } + private static class ClassLoaderObjectInputStream extends ObjectInputStream { private ClassLoader classLoader; diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java index e802ef9..6260dd6 100644 --- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java @@ -25,7 +25,7 @@ import org.apache.hyracks.api.result.ResultJobRecord.Status; import org.apache.hyracks.api.result.ResultSetId; import org.apache.hyracks.ipc.api.IIPCHandle; import org.apache.hyracks.ipc.api.RPCInterface; -import org.apache.hyracks.ipc.impl.HyracksClientInterfaceFunctions; +import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions; //TODO(madhusudancs): Should this implementation be moved to org.apache.hyracks.client? public class ResultDirectoryRemoteProxy implements IResultDirectory { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java index f2ea988..deb6785 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java @@ -44,7 +44,7 @@ import org.apache.hyracks.control.common.work.IPCResponder; import org.apache.hyracks.ipc.api.IIPCHandle; import org.apache.hyracks.ipc.api.IIPCI; import org.apache.hyracks.ipc.exceptions.IPCException; -import org.apache.hyracks.ipc.impl.HyracksClientInterfaceFunctions; +import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java index c2e7b22..419dff6 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java @@ -36,7 +36,7 @@ import java.util.TreeMap; import java.util.concurrent.ExecutorService; import org.apache.hyracks.api.application.ICCApplication; -import org.apache.hyracks.api.client.impl.ClusterControllerInfo; +import org.apache.hyracks.api.client.ClusterControllerInfo; import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.config.IApplicationConfig; import org.apache.hyracks.api.context.ICCContext; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java index 4f76ced..3e72942 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java @@ -46,6 +46,7 @@ import org.apache.hyracks.control.cc.job.IJobManager; import org.apache.hyracks.control.cc.job.JobRun; import org.apache.hyracks.control.cc.scheduler.IResourceManager; import org.apache.hyracks.control.common.controllers.CCConfig; +import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.hyracks.ipc.exceptions.IPCException; import org.apache.hyracks.util.annotations.Idempotent; import org.apache.hyracks.util.annotations.NotThreadSafe; @@ -218,7 +219,7 @@ public class NodeManager implements INodeManager { } private InetAddress getIpAddress(NodeControllerState ncState) throws HyracksException { - String ipAddress = ncState.getNCConfig().getDataPublicAddress(); + String ipAddress = (String) ncState.getConfig().get(NCConfig.Option.DATA_PUBLIC_ADDRESS.toSerializable()); try { return InetAddress.getByName(ipAddress); } catch (UnknownHostException e) { @@ -237,8 +238,8 @@ public class NodeManager implements INodeManager { state.getNodeController().shutdown(false); LOGGER.warn("Request to shutdown failed node {} succeeded. false positive heartbeat miss indication", nodeId); - } catch (Exception ignore) { - LOGGER.debug(() -> "Ignoring failure on ensuring node " + nodeId + " has failed", ignore); + } catch (Exception ex) { + LOGGER.debug(() -> "Ignoring failure on ensuring node " + nodeId + " has failed", ex); } }); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java index aa7dca1..30115ff 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java @@ -37,7 +37,6 @@ import org.apache.hyracks.control.cc.NodeControllerState; import org.apache.hyracks.control.cc.cluster.INodeManager; import org.apache.hyracks.control.common.config.ConfigUtils; import org.apache.hyracks.control.common.controllers.CCConfig; -import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.hyracks.control.common.work.IPCResponder; import org.apache.hyracks.control.common.work.SynchronizableWork; import org.apache.hyracks.util.MXHelper; @@ -89,10 +88,7 @@ public class GetNodeDetailsJSONWork extends SynchronizableWork { if (ncs != null) { detail = ncs.toDetailedJSON(includeStats, includeConfig); if (includeConfig) { - final NCConfig ncConfig = ncs.getNCConfig(); - ConfigUtils.addConfigToJSON(detail, ncConfig.getConfigManager().getNodeEffectiveConfig(nodeId), - NC_SECTIONS); - detail.putPOJO("app.args", ncConfig.getAppArgs()); + ConfigUtils.addConfigToJSON(detail, ncs.getConfig(), ccConfig.getConfigManager(), NC_SECTIONS); } } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java index fe33bc9..0ea9239 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java @@ -21,11 +21,11 @@ package org.apache.hyracks.control.cc.work; import java.util.HashMap; import java.util.Map; -import org.apache.hyracks.api.config.IApplicationConfig; import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.NodeControllerState; import org.apache.hyracks.control.cc.cluster.INodeManager; +import org.apache.hyracks.control.common.config.ConfigManager; import org.apache.hyracks.control.common.controllers.NodeParameters; import org.apache.hyracks.control.common.controllers.NodeRegistration; import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy; @@ -62,11 +62,16 @@ public class RegisterNodeWork extends SynchronizableWork { try { NodeControllerState state = new NodeControllerState(nc, reg); nodeManager.addNode(id, state); - IApplicationConfig cfg = state.getNCConfig().getConfigManager().getNodeEffectiveConfig(id); final Map<IOption, Object> ncConfiguration = new HashMap<>(); - for (IOption option : cfg.getOptions()) { - ncConfiguration.put(option, cfg.get(option)); - } + ConfigManager configManager = ccs.getConfig().getConfigManager(); + state.getConfig().forEach((key, value) -> { + IOption option = configManager.lookupOption(key); + if (option == null) { + LOGGER.info("discarding unknown option {}", key); + } else { + ncConfiguration.put(option, value); + } + }); LOGGER.info("registered node: {}", id); nc.sendRegistrationResult(params, null); ccs.getContext().notifyNodeJoin(id, ncConfiguration); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java index 9d755a0..d92727c 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java @@ -22,6 +22,7 @@ package org.apache.hyracks.control.cc.cluster; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.Collections; import java.util.concurrent.Executors; import org.apache.hyracks.api.comm.NetworkAddress; @@ -174,9 +175,8 @@ public class NodeManagerTest { when(ncState.getDataPort()).thenReturn(dataAddr); when(ncState.getResultPort()).thenReturn(resultAddr); when(ncState.getMessagingPort()).thenReturn(msgAddr); - NCConfig ncConfig = new NCConfig(nodeId); - ncConfig.setDataPublicAddress(ipAddr); - when(ncState.getNCConfig()).thenReturn(ncConfig); + when(ncState.getConfig()) + .thenReturn(Collections.singletonMap(NCConfig.Option.DATA_PUBLIC_ADDRESS.toSerializable(), ipAddr)); Mockito.when(ncState.getNodeController()).thenReturn(ncProxy); return ncState; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java index 1926da6..c37acab 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java @@ -21,6 +21,7 @@ package org.apache.hyracks.control.common; import static org.apache.hyracks.control.common.utils.ConfigurationUtil.toPathElements; import static org.apache.hyracks.util.JSONUtil.put; +import java.util.Collections; import java.util.Date; import java.util.HashSet; import java.util.List; @@ -28,9 +29,9 @@ import java.util.Map; import java.util.Set; import org.apache.hyracks.api.comm.NetworkAddress; +import org.apache.hyracks.api.config.SerializedOption; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.resource.NodeCapacity; -import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.hyracks.control.common.controllers.NodeRegistration; import org.apache.hyracks.control.common.heartbeat.HeartbeatData; import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema; @@ -42,7 +43,9 @@ public class NodeControllerData { private static final int RRD_SIZE = 720; - private final NCConfig ncConfig; + private final String nodeId; + + private final Map<SerializedOption, Object> config; private final NetworkAddress dataPort; @@ -145,7 +148,9 @@ public class NodeControllerData { private NodeCapacity capacity; public NodeControllerData(NodeRegistration reg) { - ncConfig = reg.getNCConfig(); + nodeId = reg.getNodeId(); + config = Collections.unmodifiableMap(reg.getConfig()); + dataPort = reg.getDataPort(); resultPort = reg.getResultPort(); messagingPort = reg.getMessagingPort(); @@ -252,8 +257,8 @@ public class NodeControllerData { return System.nanoTime() - lastHeartbeatNanoTime; } - public NCConfig getNCConfig() { - return ncConfig; + public Map<SerializedOption, Object> getConfig() { + return config; } public Set<JobId> getActiveJobIds() { @@ -279,7 +284,7 @@ public class NodeControllerData { public synchronized ObjectNode toSummaryJSON() { ObjectMapper om = new ObjectMapper(); ObjectNode o = om.createObjectNode(); - put(o, "node-id", ncConfig.getNodeId()); + put(o, "node-id", nodeId); put(o, "heap-used", heapUsedSize[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]); put(o, "system-load-average", systemLoadAverage[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]); @@ -290,7 +295,7 @@ public class NodeControllerData { ObjectMapper om = new ObjectMapper(); ObjectNode o = om.createObjectNode(); - put(o, "node-id", ncConfig.getNodeId()); + put(o, "node-id", nodeId); if (includeConfig) { put(o, "os-name", osName); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java index 1dae48c..44fa0bc 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java @@ -52,6 +52,7 @@ import org.apache.hyracks.api.config.IConfigManager; import org.apache.hyracks.api.config.IConfigurator; import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.config.Section; +import org.apache.hyracks.api.config.SerializedOption; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.control.common.application.ConfigManagerApplicationConfig; import org.apache.logging.log4j.Level; @@ -210,6 +211,10 @@ public class ConfigManager implements IConfigManager, Serializable { this.versionString = versionString; } + public IOption lookupOption(SerializedOption option) { + return lookupOption(option.section().sectionName(), IOption.toIni(option.optionName())); + } + public IOption lookupOption(String section, String key) { Map<String, IOption> map = getSectionOptionMap(Section.parseSectionName(section)); return map == null ? null : map.get(key); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigUtils.java index 4fa9b56..1cc739c 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigUtils.java @@ -34,6 +34,8 @@ import java.util.function.Predicate; import org.apache.hyracks.api.config.IApplicationConfig; import org.apache.hyracks.api.config.IOption; +import org.apache.hyracks.api.config.Section; +import org.apache.hyracks.api.config.SerializedOption; import org.apache.hyracks.control.common.controllers.ControllerConfig; import org.ini4j.Ini; import org.kohsuke.args4j.CmdLineException; @@ -150,23 +152,21 @@ public class ConfigUtils { return value; } - public static String getString(Ini ini, org.apache.hyracks.api.config.Section section, IOption option, - String defaultValue) { + public static String getString(Ini ini, Section section, IOption option, String defaultValue) { return getString(ini, section.sectionName(), option.ini(), defaultValue); } - public static void addConfigToJSON(ObjectNode o, IApplicationConfig cfg, - org.apache.hyracks.api.config.Section... sections) { + public static void addConfigToJSON(ObjectNode o, IApplicationConfig cfg, Section... sections) { ArrayNode configArray = o.putArray("config"); - for (org.apache.hyracks.api.config.Section section : cfg.getSections(Arrays.asList(sections)::contains)) { + for (Section section : cfg.getSections(Arrays.asList(sections)::contains)) { ObjectNode sectionNode = configArray.addObject(); Map<String, Object> sectionConfig = getSectionOptionsForJSON(cfg, section, option -> true); sectionNode.put("section", section.sectionName()).putPOJO("properties", sectionConfig); } } - public static Map<String, Object> getSectionOptionsForJSON(IApplicationConfig cfg, - org.apache.hyracks.api.config.Section section, Predicate<IOption> selector) { + public static Map<String, Object> getSectionOptionsForJSON(IApplicationConfig cfg, Section section, + Predicate<IOption> selector) { Map<String, Object> sectionConfig = new TreeMap<>(); for (IOption option : cfg.getOptions(section)) { if (selector.test(option)) { @@ -175,4 +175,28 @@ public class ConfigUtils { } return sectionConfig; } + + public static void addConfigToJSON(ObjectNode o, Map<SerializedOption, Object> config, ConfigManager configManager, + Section... sections) { + ArrayNode configArray = o.putArray("config"); + for (Section section : sections) { + ObjectNode sectionNode = configArray.addObject(); + Map<String, Object> sectionConfig = new TreeMap<>(); + config.entrySet().stream().filter(e -> e.getKey().section().equals(section)).forEach(entry -> sectionConfig + .put(IOption.toIni(entry.getKey().optionName()), fixupValueForJSON(entry, configManager))); + if (!sectionConfig.isEmpty()) { + sectionNode.put("section", section.sectionName()).putPOJO("properties", sectionConfig); + } + } + } + + private static Object fixupValueForJSON(Map.Entry<SerializedOption, Object> entry, ConfigManager configManager) { + IOption option = configManager.lookupOption(entry.getKey()); + if (option != null) { + // use the type system for the option to serialize this + return option.type().serializeToJSON(entry.getValue()); + } + // not much we can do, let default JSON serialization do its thing + return entry.getValue(); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java index d41350f..acfa394 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java @@ -18,7 +18,6 @@ */ package org.apache.hyracks.control.common.controllers; -import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN; import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER; import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT; import static org.apache.hyracks.control.common.config.OptionTypes.LONG; @@ -260,10 +259,18 @@ public class NCConfig extends ControllerConfig { } public NCConfig(String nodeId, ConfigManager configManager) { + this(nodeId, configManager, true); + } + + public NCConfig(String nodeId, ConfigManager configManager, boolean selfRegister) { super(configManager); this.appConfig = nodeId == null ? configManager.getAppConfig() : configManager.getNodeEffectiveConfig(nodeId); - configManager.register(Option.class); - configManager.register(ControllerConfig.Option.class); + if (selfRegister) { + configManager.register(Option.class); + configManager.register(ControllerConfig.Option.class); + } else { + configManager.register(Option.NODE_ID); + } setNodeId(nodeId); this.nodeId = nodeId; configManager.registerArgsListener(appArgs::addAll); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java index d9165e1..e78a423 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java @@ -20,7 +20,7 @@ package org.apache.hyracks.control.common.controllers; import java.io.Serializable; -import org.apache.hyracks.api.client.impl.ClusterControllerInfo; +import org.apache.hyracks.api.client.ClusterControllerInfo; public class NodeParameters implements Serializable { private static final long serialVersionUID = 1L; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java index b4d835d..f76d9b8 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java @@ -21,13 +21,20 @@ package org.apache.hyracks.control.common.controllers; import static org.apache.hyracks.util.MXHelper.osMXBean; import static org.apache.hyracks.util.MXHelper.runtimeMXBean; +import java.io.IOException; +import java.io.ObjectInputStream; import java.io.Serializable; import java.net.InetSocketAddress; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hyracks.api.comm.NetworkAddress; +import org.apache.hyracks.api.config.IApplicationConfig; +import org.apache.hyracks.api.config.IOption; +import org.apache.hyracks.api.config.SerializedOption; import org.apache.hyracks.api.job.resource.NodeCapacity; +import org.apache.hyracks.api.util.JavaSerializationUtils; import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema; import org.apache.hyracks.util.MXHelper; import org.apache.hyracks.util.PidHelper; @@ -39,6 +46,8 @@ public final class NodeRegistration implements Serializable { private final String nodeId; + @Deprecated // required for binary backward-compatibility when registering with a 0.9.4 CC + @SuppressWarnings("unused") private final NCConfig ncConfig; private final NetworkAddress dataPort; @@ -77,11 +86,12 @@ public final class NodeRegistration implements Serializable { private final NodeCapacity capacity; + private final HashMap<SerializedOption, Object> config; + public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort, NetworkAddress resultPort, HeartbeatSchema hbSchema, NetworkAddress messagingPort, NodeCapacity capacity) { this.ncAddress = ncAddress; this.nodeId = nodeId; - this.ncConfig = ncConfig; this.dataPort = dataPort; this.resultPort = resultPort; this.hbSchema = hbSchema; @@ -100,6 +110,12 @@ public final class NodeRegistration implements Serializable { this.inputArguments = runtimeMXBean.getInputArguments(); this.systemProperties = runtimeMXBean.getSystemProperties(); this.pid = PidHelper.getPid(); + IApplicationConfig cfg = ncConfig.getConfigManager().getNodeEffectiveConfig(nodeId); + this.config = new HashMap<>(); + for (IOption option : cfg.getOptions()) { + config.put(option.toSerializable(), cfg.get(option)); + } + this.ncConfig = null; } public InetSocketAddress getNodeControllerAddress() { @@ -114,8 +130,8 @@ public final class NodeRegistration implements Serializable { return capacity; } - public NCConfig getNCConfig() { - return ncConfig; + public Map<SerializedOption, Object> getConfig() { + return config; } public NetworkAddress getDataPort() { @@ -185,4 +201,9 @@ public final class NodeRegistration implements Serializable { public int getPid() { return pid; } -} + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + JavaSerializationUtils.readObject(in, this); + } + +} \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java index 3fac7da..4f27ec1 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java @@ -24,9 +24,10 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions; import org.apache.hyracks.api.client.IHyracksClientInterface; import org.apache.hyracks.api.client.NodeControllerInfo; -import org.apache.hyracks.api.client.impl.ClusterControllerInfo; +import org.apache.hyracks.api.client.ClusterControllerInfo; import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.deployment.DeploymentId; import org.apache.hyracks.api.job.DeployedJobSpecId; diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java index 2c7e82e..faae14b 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java @@ -40,7 +40,7 @@ import org.apache.http.impl.client.DefaultHttpClient; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.client.IHyracksClientInterface; import org.apache.hyracks.api.client.NodeControllerInfo; -import org.apache.hyracks.api.client.impl.ClusterControllerInfo; +import org.apache.hyracks.api.client.ClusterControllerInfo; import org.apache.hyracks.api.client.impl.JobSpecificationActivityClusterGraphGeneratorFactory; import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.deployment.DeploymentId; diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java index c4263d2..439f230 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java @@ -18,8 +18,11 @@ */ package org.apache.hyracks.ipc.impl; +import static org.apache.hyracks.api.util.JavaSerializationUtils.getSerializationProvider; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.OutputStream; @@ -28,6 +31,7 @@ import java.nio.ByteBuffer; import org.apache.hyracks.ipc.api.IPayloadSerializerDeserializer; public class JavaSerializationBasedPayloadSerializerDeserializer implements IPayloadSerializerDeserializer { + @Override public Object deserializeObject(ByteBuffer buffer, int length) throws Exception { return deserialize(buffer, length); @@ -48,21 +52,23 @@ public class JavaSerializationBasedPayloadSerializerDeserializer implements IPay return serialize(exception); } - public static void serialize(OutputStream out, Object object) throws Exception { - ObjectOutputStream oos = new ObjectOutputStream(out); - oos.writeObject(object); - oos.flush(); + public static void serialize(OutputStream out, Object object) throws IOException { + try (ObjectOutputStream oos = getSerializationProvider().newObjectOutputStream(out)) { + oos.writeObject(object); + oos.flush(); + } } private Object deserialize(ByteBuffer buffer, int length) throws Exception { - ObjectInputStream ois = - new ObjectInputStream(new ByteArrayInputStream(buffer.array(), buffer.position(), length)); - Object object = ois.readObject(); - ois.close(); + Object object; + try (ObjectInputStream ois = getSerializationProvider() + .newObjectInputStream(new ByteArrayInputStream(buffer.array(), buffer.position(), length))) { + object = ois.readObject(); + } return object; } - private byte[] serialize(Object object) throws Exception { + private byte[] serialize(Object object) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); serialize(baos, object); baos.close(); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java index 7d5beff..cdd3ea2 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java @@ -18,6 +18,7 @@ */ package org.apache.hyracks.storage.am.lsm.btree.dataflow; +import java.io.IOException; import java.util.List; import java.util.Map; @@ -42,6 +43,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider; import org.apache.hyracks.storage.am.lsm.common.dataflow.LsmResource; import org.apache.hyracks.storage.common.IStorageManager; import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory; +import org.apache.hyracks.util.CompatibilityUtil; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -133,4 +135,15 @@ public class LSMBTreeLocalResource extends LsmResource { json.putPOJO("btreeFields", btreeFields); json.putPOJO("compressorDecompressorFactory", compressorDecompressorFactory.toJson(registry)); } + + private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + + // compat w/ 0.3.4 + if (compressorDecompressorFactory == null) { + CompatibilityUtil.writeField(this, "compressorDecompressorFactory", + NoOpCompressorDecompressorFactory.INSTANCE); + } + } + } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java index ea41c3d..740b3d7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java @@ -18,6 +18,7 @@ */ package org.apache.hyracks.storage.am.lsm.btree.dataflow; +import java.io.IOException; import java.util.Map; import org.apache.hyracks.api.compression.ICompressorDecompressorFactory; @@ -33,6 +34,8 @@ import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider; import org.apache.hyracks.storage.am.lsm.common.dataflow.LsmResource; import org.apache.hyracks.storage.am.lsm.common.dataflow.LsmResourceFactory; import org.apache.hyracks.storage.common.IStorageManager; +import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory; +import org.apache.hyracks.util.CompatibilityUtil; public class LSMBTreeLocalResourceFactory extends LsmResourceFactory { @@ -69,4 +72,14 @@ public class LSMBTreeLocalResourceFactory extends LsmResourceFactory { filterTypeTraits, filterCmpFactories, btreeFields, filterFields, opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, durable, compressorDecompressorFactory); } + + private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + + // compat w/ 0.3.4 + if (compressorDecompressorFactory == null) { + CompatibilityUtil.writeField(this, "compressorDecompressorFactory", + NoOpCompressorDecompressorFactory.INSTANCE); + } + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressorFactory.java index 690f4a2..90a4b56 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressorFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressorFactory.java @@ -18,6 +18,8 @@ */ package org.apache.hyracks.storage.common.compression; +import java.io.ObjectStreamException; + import org.apache.hyracks.api.compression.ICompressorDecompressor; import org.apache.hyracks.api.compression.ICompressorDecompressorFactory; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -26,7 +28,7 @@ import org.apache.hyracks.api.io.IPersistedResourceRegistry; import com.fasterxml.jackson.databind.JsonNode; -public class NoOpCompressorDecompressorFactory implements ICompressorDecompressorFactory { +public final class NoOpCompressorDecompressorFactory implements ICompressorDecompressorFactory { private static final long serialVersionUID = 1L; public static final ICompressorDecompressorFactory INSTANCE = new NoOpCompressorDecompressorFactory(); @@ -44,4 +46,8 @@ public class NoOpCompressorDecompressorFactory implements ICompressorDecompresso public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json) { return INSTANCE; } + + private Object readResolve() throws ObjectStreamException { + return INSTANCE; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/CompatibilityUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/CompatibilityUtil.java new file mode 100644 index 0000000..392955a --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/CompatibilityUtil.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.util; + +import java.io.IOException; +import java.lang.reflect.Field; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class CompatibilityUtil { + private static final Logger LOGGER = LogManager.getLogger(); + + private CompatibilityUtil() { + } + + public static Object readField(Object obj, String fieldName) throws IOException { + Class<?> objClass = obj.getClass(); + LOGGER.debug("reading field '{}' on object of type {}", fieldName, objClass); + try { + Field f = objClass.getDeclaredField(fieldName); + f.setAccessible(true); + return f.get(obj); + } catch (NoSuchFieldException | IllegalAccessException e) { + LOGGER.warn("exception reading field '{}' on object of type {}", fieldName, objClass, e); + throw new IOException(e); + } + } + + public static void writeField(Object obj, String fieldName, Object newValue) throws IOException { + Class<?> objClass = obj.getClass(); + LOGGER.debug("updating field '{}' on object of type {} to {}", fieldName, objClass, newValue); + try { + Field f = objClass.getDeclaredField(fieldName); + f.setAccessible(true); + f.set(obj, newValue); + } catch (NoSuchFieldException | IllegalAccessException e) { + LOGGER.warn("exception updating field '{}' object of type {} to {}", fieldName, objClass, newValue, e); + throw new IOException(e); + } + } +}