>From Hussain Towaileb <[email protected]>:

Hussain Towaileb has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20485?usp=email )

Change subject: [ASTERIXDB-3657][FAIL]: handle non-serializable exceptions
......................................................................

[ASTERIXDB-3657][FAIL]: handle non-serializable exceptions

Ext-ref: MB-68744
Change-Id: I55e69623068a6c1759803cc699417021910237fd
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20485
Reviewed-by: Hussain Towaileb <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Michael Blow <[email protected]>
Tested-by: Hussain Towaileb <[email protected]>
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
A 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ReplacementsAwareJavaSerializationProvider.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java
A 
hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingIOFunction.java
7 files changed, 174 insertions(+), 10 deletions(-)

Approvals:
  Michael Blow: Looks good to me, approved
  Jenkins: Verified
  Hussain Towaileb: Looks good to me, but someone else must approve; Verified




diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 4e35ade..1af75c1 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -25,6 +25,7 @@
 import static 
org.apache.asterix.common.api.IClusterManagementWork.ClusterState.ACTIVE;
 import static 
org.apache.asterix.common.api.IClusterManagementWork.ClusterState.REBALANCE_REQUIRED;
 import static 
org.apache.asterix.common.api.IClusterManagementWork.ClusterState.SHUTTING_DOWN;
+import static 
org.apache.hyracks.api.util.JavaSerializationUtils.registerReplacement;
 import static 
org.apache.hyracks.control.common.controllers.ControllerConfig.Option.CLOUD_DEPLOYMENT;

 import java.io.File;
@@ -120,6 +121,7 @@
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 import org.apache.hyracks.api.result.IJobResultCallback;
+import 
org.apache.hyracks.api.util.JavaSerializationUtils.SerializableExceptionProxy;
 import org.apache.hyracks.control.cc.BaseCCApplication;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.controllers.CCConfig;
@@ -137,6 +139,8 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;

+import com.azure.storage.blob.models.BlobStorageException;
+
 public class CCApplication extends BaseCCApplication {

     private static final Logger LOGGER = LogManager.getLogger();
@@ -154,6 +158,11 @@
         ccServiceCtx.setThreadFactory(
                 new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new 
LifeCycleComponentManager()));
         validateEnvironment();
+        registerSerializationReplacements();
+    }
+
+    private void registerSerializationReplacements() {
+        registerReplacement(BlobStorageException.class, 
SerializableExceptionProxy::new);
     }

     @Override
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 0e6a04b..da606c0 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -25,6 +25,7 @@
 import static org.apache.asterix.common.utils.Servlets.QUERY_STATUS;
 import static org.apache.asterix.common.utils.Servlets.UDF;
 import static org.apache.asterix.common.utils.Servlets.UDF_RECOVERY;
+import static 
org.apache.hyracks.api.util.JavaSerializationUtils.registerReplacement;
 import static 
org.apache.hyracks.control.common.controllers.ControllerConfig.Option.CLOUD_DEPLOYMENT;

 import java.io.File;
@@ -102,6 +103,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IFileDeviceResolver;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
+import 
org.apache.hyracks.api.util.JavaSerializationUtils.SerializableExceptionProxy;
 import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.nc.BaseNCApplication;
 import org.apache.hyracks.control.nc.NodeControllerService;
@@ -115,6 +117,8 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;

+import com.azure.storage.blob.models.BlobStorageException;
+
 public class NCApplication extends BaseNCApplication {
     private static final Logger LOGGER = LogManager.getLogger();
     protected NCExtensionManager ncExtensionManager;
@@ -141,6 +145,11 @@
                 new AsterixThreadFactory(ncServiceCtx.getThreadFactory(), 
ncServiceCtx.getLifeCycleComponentManager()));
         validateEnvironment();
         configurePersistedResourceRegistry();
+        registerSerializationReplacements();
+    }
+
+    private void registerSerializationReplacements() {
+        registerReplacement(BlobStorageException.class, 
SerializableExceptionProxy::new);
     }

     @Override
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index 8af7342..b820147 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -166,7 +166,8 @@
         } catch (InterruptedException ex) {
             throw HyracksDataException.create(ex);
         } catch (IOException ex) {
-            throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ExceptionUtils.getMessageOrToString(ex));
+            throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex,
+                    ExceptionUtils.getMessageOrToString(ex));
         }
     }

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
index 5720fc3..711cd05 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
@@ -137,7 +137,7 @@
                 doValidate(testFs);
             }
         } catch (IOException ex) {
-            throw new CompilationException(ErrorCode.EXTERNAL_SINK_ERROR, 
ExceptionUtils.getMessageOrToString(ex));
+            throw new CompilationException(ErrorCode.EXTERNAL_SINK_ERROR, ex, 
ExceptionUtils.getMessageOrToString(ex));
         }
     }

diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ReplacementsAwareJavaSerializationProvider.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ReplacementsAwareJavaSerializationProvider.java
new file mode 100644
index 0000000..1555796
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ReplacementsAwareJavaSerializationProvider.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.comm;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hyracks.util.ThrowingIOFunction;
+
+public class ReplacementsAwareJavaSerializationProvider implements 
IJavaSerializationProvider {
+    public static final ReplacementsAwareJavaSerializationProvider INSTANCE =
+            new ReplacementsAwareJavaSerializationProvider();
+    private static final Map<Class<?>, ThrowingIOFunction<Object, Object>> 
replacements = new ConcurrentHashMap<>();
+
+    private ReplacementsAwareJavaSerializationProvider() {
+    }
+
+    @Override
+    public ObjectOutputStream newObjectOutputStream(OutputStream out) throws 
IOException {
+        return new ReplacementsAwareObjectOutputStream(out);
+    }
+
+    public Map<Class<?>, ThrowingIOFunction<Object, Object>> getReplacements() 
{
+        return Collections.unmodifiableMap(replacements);
+    }
+
+    public void registerReplacement(Class<?> clazz, ThrowingIOFunction<Object, 
Object> replacementFunction) {
+        replacements.put(clazz, replacementFunction);
+    }
+
+    private static class ReplacementsAwareObjectOutputStream extends 
ObjectOutputStream {
+        public ReplacementsAwareObjectOutputStream(OutputStream out) throws 
IOException {
+            super(out);
+            enableReplaceObject(true);
+        }
+
+        @Override
+        protected Object replaceObject(Object object) throws IOException {
+            Class<?> clazz = object.getClass();
+            if (clazz.isSynthetic()) {
+                return super.replaceObject(object);
+            }
+
+            // try exact match first (fast path)
+            ThrowingIOFunction<Object, Object> replacementFunction = 
replacements.get(clazz);
+            if (replacementFunction != null) {
+                return replacementFunction.process(object);
+            }
+
+            // fallback: match by assignability (handles subclasses / 
interfaces)
+            for (Map.Entry<Class<?>, ThrowingIOFunction<Object, Object>> e : 
replacements.entrySet()) {
+                if (e.getKey().isInstance(object)) {
+                    INSTANCE.registerReplacement(clazz, e.getValue());
+                    return e.getValue().process(object);
+                }
+            }
+
+            INSTANCE.registerReplacement(clazz, super::replaceObject);
+            return super.replaceObject(object);
+        }
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java
index 8e24204..f51feba 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java
@@ -28,12 +28,15 @@
 import java.io.Serializable;
 import java.lang.reflect.Modifier;
 import java.lang.reflect.Proxy;
+import java.util.Map;

-import org.apache.hyracks.api.comm.DefaultJavaSerializationProvider;
 import org.apache.hyracks.api.comm.IJavaSerializationProvider;
+import org.apache.hyracks.api.comm.ReplacementsAwareJavaSerializationProvider;
+import org.apache.hyracks.util.ThrowingIOFunction;

 public class JavaSerializationUtils {
-    private static IJavaSerializationProvider serProvider = 
DefaultJavaSerializationProvider.INSTANCE;
+    private static final ReplacementsAwareJavaSerializationProvider 
serProvider =
+            ReplacementsAwareJavaSerializationProvider.INSTANCE;
 
     private JavaSerializationUtils() {
     }
@@ -89,10 +92,6 @@
         return Class.forName(className);
     }

-    public static void setSerializationProvider(IJavaSerializationProvider 
serProvider) {
-        JavaSerializationUtils.serProvider = serProvider;
-    }
-
     public static IJavaSerializationProvider getSerializationProvider() {
         return serProvider;
     }
@@ -106,7 +105,7 @@
     }

     private static class ClassLoaderObjectInputStream extends 
ObjectInputStream {
-        private ClassLoader classLoader;
+        private final ClassLoader classLoader;

         protected ClassLoaderObjectInputStream(InputStream in, ClassLoader 
classLoader)
                 throws IOException, SecurityException {
@@ -120,7 +119,7 @@
         }

         @Override
-        protected Class<?> resolveProxyClass(String[] interfaces) throws 
IOException, ClassNotFoundException {
+        protected Class<?> resolveProxyClass(String[] interfaces) throws 
ClassNotFoundException {
             ClassLoader nonPublicLoader = null;
             boolean hasNonPublicInterface = false;

@@ -147,4 +146,28 @@
             }
         }
     }
+
+    public static Map<Class<?>, ThrowingIOFunction<Object, Object>> 
getReplacements() {
+        return serProvider.getReplacements();
+    }
+
+    public static <T> void registerReplacement(Class<T> clazz, 
ThrowingIOFunction<? super T, ?> replacementFunction) {
+        serProvider.registerReplacement(clazz, object -> 
replacementFunction.process(clazz.cast(object)));
+    }
+
+    public static class SerializableExceptionProxy extends Throwable {
+        private static final long serialVersionUID = 1L;
+        private final String type;
+
+        public SerializableExceptionProxy(Throwable t) {
+            super(ExceptionUtils.getMessageOrToString(t));
+            this.type = t.getClass().getName();
+            setStackTrace(t.getStackTrace());
+        }
+
+        @Override
+        public String toString() {
+            return type + ": " + getMessage();
+        }
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingIOFunction.java
 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingIOFunction.java
new file mode 100644
index 0000000..5829ae2
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingIOFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+@FunctionalInterface
+public interface ThrowingIOFunction<I, R> {
+    R process(I input) throws IOException;
+
+    @SuppressWarnings("Duplicates")
+    static <I, R> Function<I, R> asUnchecked(ThrowingIOFunction<I, R> 
consumer) {
+        return input -> {
+            try {
+                return consumer.process(input);
+            } catch (Exception e) {
+                throw new UncheckedExecutionException(e);
+            }
+        };
+    }
+}

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20485?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: merged
Gerrit-Project: asterixdb
Gerrit-Branch: phoenix
Gerrit-Change-Id: I55e69623068a6c1759803cc699417021910237fd
Gerrit-Change-Number: 20485
Gerrit-PatchSet: 10
Gerrit-Owner: Hussain Towaileb <[email protected]>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Hussain Towaileb <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Michael Blow <[email protected]>

Reply via email to