>From Hussain Towaileb <[email protected]>: Hussain Towaileb has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20485?usp=email )
Change subject: [ASTERIXDB-3657][FAIL]: handle non-serializable exceptions ...................................................................... [ASTERIXDB-3657][FAIL]: handle non-serializable exceptions Ext-ref: MB-68744 Change-Id: I55e69623068a6c1759803cc699417021910237fd Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20485 Reviewed-by: Hussain Towaileb <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Tested-by: Hussain Towaileb <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java A hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ReplacementsAwareJavaSerializationProvider.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java A hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingIOFunction.java 7 files changed, 174 insertions(+), 10 deletions(-) Approvals: Michael Blow: Looks good to me, approved Jenkins: Verified Hussain Towaileb: Looks good to me, but someone else must approve; Verified diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index 4e35ade..1af75c1 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -25,6 +25,7 @@ import static org.apache.asterix.common.api.IClusterManagementWork.ClusterState.ACTIVE; import static org.apache.asterix.common.api.IClusterManagementWork.ClusterState.REBALANCE_REQUIRED; import static org.apache.asterix.common.api.IClusterManagementWork.ClusterState.SHUTTING_DOWN; +import static org.apache.hyracks.api.util.JavaSerializationUtils.registerReplacement; import static org.apache.hyracks.control.common.controllers.ControllerConfig.Option.CLOUD_DEPLOYMENT; import java.io.File; @@ -120,6 +121,7 @@ import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager; import org.apache.hyracks.api.result.IJobResultCallback; +import org.apache.hyracks.api.util.JavaSerializationUtils.SerializableExceptionProxy; import org.apache.hyracks.control.cc.BaseCCApplication; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.common.controllers.CCConfig; @@ -137,6 +139,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import com.azure.storage.blob.models.BlobStorageException; + public class CCApplication extends BaseCCApplication { private static final Logger LOGGER = LogManager.getLogger(); @@ -154,6 +158,11 @@ ccServiceCtx.setThreadFactory( new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new LifeCycleComponentManager())); validateEnvironment(); + registerSerializationReplacements(); + } + + private void registerSerializationReplacements() { + registerReplacement(BlobStorageException.class, SerializableExceptionProxy::new); } @Override diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index 0e6a04b..da606c0 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -25,6 +25,7 @@ import static org.apache.asterix.common.utils.Servlets.QUERY_STATUS; import static org.apache.asterix.common.utils.Servlets.UDF; import static org.apache.asterix.common.utils.Servlets.UDF_RECOVERY; +import static org.apache.hyracks.api.util.JavaSerializationUtils.registerReplacement; import static org.apache.hyracks.control.common.controllers.ControllerConfig.Option.CLOUD_DEPLOYMENT; import java.io.File; @@ -102,6 +103,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IFileDeviceResolver; import org.apache.hyracks.api.job.resource.NodeCapacity; +import org.apache.hyracks.api.util.JavaSerializationUtils.SerializableExceptionProxy; import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.hyracks.control.nc.BaseNCApplication; import org.apache.hyracks.control.nc.NodeControllerService; @@ -115,6 +117,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import com.azure.storage.blob.models.BlobStorageException; + public class NCApplication extends BaseNCApplication { private static final Logger LOGGER = LogManager.getLogger(); protected NCExtensionManager ncExtensionManager; @@ -141,6 +145,11 @@ new AsterixThreadFactory(ncServiceCtx.getThreadFactory(), ncServiceCtx.getLifeCycleComponentManager())); validateEnvironment(); configurePersistedResourceRegistry(); + registerSerializationReplacements(); + } + + private void registerSerializationReplacements() { + registerReplacement(BlobStorageException.class, SerializableExceptionProxy::new); } @Override diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java index 8af7342..b820147 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java @@ -166,7 +166,8 @@ } catch (InterruptedException ex) { throw HyracksDataException.create(ex); } catch (IOException ex) { - throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, ExceptionUtils.getMessageOrToString(ex)); + throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, + ExceptionUtils.getMessageOrToString(ex)); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java index 5720fc3..711cd05 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java @@ -137,7 +137,7 @@ doValidate(testFs); } } catch (IOException ex) { - throw new CompilationException(ErrorCode.EXTERNAL_SINK_ERROR, ExceptionUtils.getMessageOrToString(ex)); + throw new CompilationException(ErrorCode.EXTERNAL_SINK_ERROR, ex, ExceptionUtils.getMessageOrToString(ex)); } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ReplacementsAwareJavaSerializationProvider.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ReplacementsAwareJavaSerializationProvider.java new file mode 100644 index 0000000..1555796 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ReplacementsAwareJavaSerializationProvider.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.comm; + +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hyracks.util.ThrowingIOFunction; + +public class ReplacementsAwareJavaSerializationProvider implements IJavaSerializationProvider { + public static final ReplacementsAwareJavaSerializationProvider INSTANCE = + new ReplacementsAwareJavaSerializationProvider(); + private static final Map<Class<?>, ThrowingIOFunction<Object, Object>> replacements = new ConcurrentHashMap<>(); + + private ReplacementsAwareJavaSerializationProvider() { + } + + @Override + public ObjectOutputStream newObjectOutputStream(OutputStream out) throws IOException { + return new ReplacementsAwareObjectOutputStream(out); + } + + public Map<Class<?>, ThrowingIOFunction<Object, Object>> getReplacements() { + return Collections.unmodifiableMap(replacements); + } + + public void registerReplacement(Class<?> clazz, ThrowingIOFunction<Object, Object> replacementFunction) { + replacements.put(clazz, replacementFunction); + } + + private static class ReplacementsAwareObjectOutputStream extends ObjectOutputStream { + public ReplacementsAwareObjectOutputStream(OutputStream out) throws IOException { + super(out); + enableReplaceObject(true); + } + + @Override + protected Object replaceObject(Object object) throws IOException { + Class<?> clazz = object.getClass(); + if (clazz.isSynthetic()) { + return super.replaceObject(object); + } + + // try exact match first (fast path) + ThrowingIOFunction<Object, Object> replacementFunction = replacements.get(clazz); + if (replacementFunction != null) { + return replacementFunction.process(object); + } + + // fallback: match by assignability (handles subclasses / interfaces) + for (Map.Entry<Class<?>, ThrowingIOFunction<Object, Object>> e : replacements.entrySet()) { + if (e.getKey().isInstance(object)) { + INSTANCE.registerReplacement(clazz, e.getValue()); + return e.getValue().process(object); + } + } + + INSTANCE.registerReplacement(clazz, super::replaceObject); + return super.replaceObject(object); + } + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java index 8e24204..f51feba 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java @@ -28,12 +28,15 @@ import java.io.Serializable; import java.lang.reflect.Modifier; import java.lang.reflect.Proxy; +import java.util.Map; -import org.apache.hyracks.api.comm.DefaultJavaSerializationProvider; import org.apache.hyracks.api.comm.IJavaSerializationProvider; +import org.apache.hyracks.api.comm.ReplacementsAwareJavaSerializationProvider; +import org.apache.hyracks.util.ThrowingIOFunction; public class JavaSerializationUtils { - private static IJavaSerializationProvider serProvider = DefaultJavaSerializationProvider.INSTANCE; + private static final ReplacementsAwareJavaSerializationProvider serProvider = + ReplacementsAwareJavaSerializationProvider.INSTANCE; private JavaSerializationUtils() { } @@ -89,10 +92,6 @@ return Class.forName(className); } - public static void setSerializationProvider(IJavaSerializationProvider serProvider) { - JavaSerializationUtils.serProvider = serProvider; - } - public static IJavaSerializationProvider getSerializationProvider() { return serProvider; } @@ -106,7 +105,7 @@ } private static class ClassLoaderObjectInputStream extends ObjectInputStream { - private ClassLoader classLoader; + private final ClassLoader classLoader; protected ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException, SecurityException { @@ -120,7 +119,7 @@ } @Override - protected Class<?> resolveProxyClass(String[] interfaces) throws IOException, ClassNotFoundException { + protected Class<?> resolveProxyClass(String[] interfaces) throws ClassNotFoundException { ClassLoader nonPublicLoader = null; boolean hasNonPublicInterface = false; @@ -147,4 +146,28 @@ } } } + + public static Map<Class<?>, ThrowingIOFunction<Object, Object>> getReplacements() { + return serProvider.getReplacements(); + } + + public static <T> void registerReplacement(Class<T> clazz, ThrowingIOFunction<? super T, ?> replacementFunction) { + serProvider.registerReplacement(clazz, object -> replacementFunction.process(clazz.cast(object))); + } + + public static class SerializableExceptionProxy extends Throwable { + private static final long serialVersionUID = 1L; + private final String type; + + public SerializableExceptionProxy(Throwable t) { + super(ExceptionUtils.getMessageOrToString(t)); + this.type = t.getClass().getName(); + setStackTrace(t.getStackTrace()); + } + + @Override + public String toString() { + return type + ": " + getMessage(); + } + } } diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingIOFunction.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingIOFunction.java new file mode 100644 index 0000000..5829ae2 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingIOFunction.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.util; + +import java.io.IOException; +import java.util.function.Function; + +import com.google.common.util.concurrent.UncheckedExecutionException; + +@FunctionalInterface +public interface ThrowingIOFunction<I, R> { + R process(I input) throws IOException; + + @SuppressWarnings("Duplicates") + static <I, R> Function<I, R> asUnchecked(ThrowingIOFunction<I, R> consumer) { + return input -> { + try { + return consumer.process(input); + } catch (Exception e) { + throw new UncheckedExecutionException(e); + } + }; + } +} -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20485?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: phoenix Gerrit-Change-Id: I55e69623068a6c1759803cc699417021910237fd Gerrit-Change-Number: 20485 Gerrit-PatchSet: 10 Gerrit-Owner: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]>
