This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch rt-oal in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit eb12d6b1efa0201b91f4e2a8d2911f7331bffe7e Author: Wu Sheng <[email protected]> AuthorDate: Tue Jul 16 23:44:42 2019 +0800 Set up the basic structure of new OAL engine. --- .../apm/agent/core/boot/AgentPackagePath.java | 3 +- .../org/apache/skywalking/oal/rt/OALRuntime.java | 65 ++++++++++++++++++++-- .../oap/server/core/CoreModuleProvider.java | 16 +++++- .../skywalking/oap/server/core/WorkPath.java | 45 +++++++-------- .../server/core/analysis/DispatcherManager.java | 65 ++++++++++++---------- .../oap/server/core/oal/rt/OALEngine.java | 5 ++ 6 files changed, 136 insertions(+), 63 deletions(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/boot/AgentPackagePath.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/boot/AgentPackagePath.java index a7574ef..f391a2e 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/boot/AgentPackagePath.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/boot/AgentPackagePath.java @@ -72,7 +72,8 @@ public class AgentPackagePath { return agentJarFile.getParentFile(); } } else { - String classLocation = urlString.substring(urlString.indexOf("file:"), urlString.length() - classResourcePath.length()); + int prefixLength = "file:".length(); + String classLocation = urlString.substring(prefixLength, urlString.length() - classResourcePath.length()); return new File(classLocation); } } diff --git a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/OALRuntime.java b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/OALRuntime.java index 61de5a2..e2a2426 100644 --- a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/OALRuntime.java +++ b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/OALRuntime.java @@ -20,11 +20,14 @@ package org.apache.skywalking.oal.rt; import freemarker.template.Configuration; import freemarker.template.Version; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.PrintWriter; import java.io.Reader; import java.io.StringWriter; import java.nio.charset.Charset; +import java.util.ArrayList; import java.util.List; import java.util.Locale; import javassist.CannotCompileException; @@ -54,7 +57,9 @@ import org.apache.skywalking.oal.rt.parser.OALScripts; import org.apache.skywalking.oal.rt.parser.ScriptParser; import org.apache.skywalking.oal.rt.parser.SourceColumn; import org.apache.skywalking.oal.rt.parser.SourceColumnsFactory; +import org.apache.skywalking.oap.server.core.WorkPath; import org.apache.skywalking.oap.server.core.analysis.Stream; +import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener; import org.apache.skywalking.oap.server.core.oal.rt.OALCompileException; import org.apache.skywalking.oap.server.core.oal.rt.OALEngine; import org.apache.skywalking.oap.server.core.storage.annotation.Column; @@ -81,8 +86,11 @@ public class OALRuntime implements OALEngine { private static final String[] METRICS_CLASS_METHODS = {"id", "hashCode", "remoteHashCode", "equals", "serialize", "deserialize", "getMeta", "toDay"}; private final ClassPool classPool; + private ClassLoader currentClassLoader; private Configuration configuration; private AllDispatcherContext allDispatcherContext; + private StreamAnnotationListener streamAnnotationListener; + private final List<Class> metricsClasses; public OALRuntime() { classPool = ClassPool.getDefault(); @@ -90,9 +98,15 @@ public class OALRuntime implements OALEngine { configuration.setEncoding(Locale.ENGLISH, "UTF-8"); configuration.setClassLoaderForTemplateLoading(FileGenerator.class.getClassLoader(), "/code-templates"); allDispatcherContext = new AllDispatcherContext(); + metricsClasses = new ArrayList<>(); + } + + @Override public void setStreamListener(StreamAnnotationListener listener) throws ModuleStartException { + this.streamAnnotationListener = listener; } @Override public void start(ClassLoader currentClassLoader) throws ModuleStartException, OALCompileException { + this.currentClassLoader = currentClassLoader; Reader read; try { read = ResourceUtils.read("scope-meta.yml"); @@ -127,17 +141,22 @@ public class OALRuntime implements OALEngine { this.generateClassAtRuntime(oalScripts); } + @Override public void notifyAllListeners() throws ModuleStartException { + metricsClasses.forEach(streamAnnotationListener::notify); + } + private void generateClassAtRuntime(OALScripts oalScripts) throws OALCompileException { List<AnalysisResult> metricsStmts = oalScripts.getMetricsStmts(); metricsStmts.forEach(this::buildDispatcherContext); for (AnalysisResult metricsStmt : metricsStmts) { - generateMetricsClass(metricsStmt); + metricsClasses.add(generateMetricsClass(metricsStmt)); + generateMetricsBuilderClass(metricsStmt); } } - private void generateMetricsClass(AnalysisResult metricsStmt) throws OALCompileException { + private Class generateMetricsClass(AnalysisResult metricsStmt) throws OALCompileException { CtClass parentMetricsClass = null; try { parentMetricsClass = classPool.get(METRICS_FUNCTION_PACKAGE + metricsStmt.getMetricsClassName()); @@ -219,7 +238,6 @@ public class OALRuntime implements OALEngine { } } - /** * Add following annotation to the metrics class * @@ -235,14 +253,22 @@ public class OALRuntime implements OALEngine { annotationsAttribute.addAnnotation(streamAnnotation); metricsClassClassFile.addAttribute(annotationsAttribute); + Class targetClass; try { - metricsClass.toClass(); + targetClass = metricsClass.toClass(currentClassLoader, null); } catch (CannotCompileException e) { - logger.error("Can't compile " + metricsStmt.getMetricsName() + ".", e); + logger.error("Can't compile/load " + metricsStmt.getMetricsName() + ".", e); throw new OALCompileException(e.getMessage(), e); } - ClassFilePrinter.print(metricsClassClassFile); + logger.debug("Generate metrics class, " + metricsClass.getName()); + writeGeneratedFile(metricsClassClassFile, metricsClass.getSimpleName()); + + return targetClass; + } + + private void generateMetricsBuilderClass(AnalysisResult metricsStmt) throws OALCompileException { + } private String metricsClassName(AnalysisResult metricsStmt) { @@ -265,4 +291,31 @@ public class OALRuntime implements OALEngine { } context.getMetrics().add(metricsStmt); } + + private void writeGeneratedFile(ClassFile metricsClassClassFile, String className) throws OALCompileException { + PrintWriter printWriter = null; + try { + File workPath = WorkPath.getPath(); + File folder = new File(workPath.getParentFile(), "oal-rt"); + if (!folder.exists()) { + folder.mkdirs(); + } + File file = new File(folder, className + ".txt"); + if (file.exists()) { + file.delete(); + } + file.createNewFile(); + printWriter = new PrintWriter(file); + ClassFilePrinter.print(metricsClassClassFile, printWriter); + printWriter.flush(); + } catch (IOException e) { + logger.warn("Can't create " + className + ".txt, ignore.", e); + return; + } finally { + if (printWriter != null) { + printWriter.close(); + } + } + + } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java index adc3192..3fc1127 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java @@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.annotation.AnnotationScan; import org.apache.skywalking.oap.server.core.cache.*; import org.apache.skywalking.oap.server.core.cluster.*; import org.apache.skywalking.oap.server.core.config.*; +import org.apache.skywalking.oap.server.core.oal.rt.OALEngine; import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoader; import org.apache.skywalking.oap.server.core.query.*; import org.apache.skywalking.oap.server.core.register.service.*; @@ -57,6 +58,8 @@ public class CoreModuleProvider extends ModuleProvider { private final StorageModels storageModels; private final StreamDataMapping streamDataMapping; private final SourceReceiverImpl receiver; + private StreamAnnotationListener streamAnnotationListener; + private OALEngine oalEngine; public CoreModuleProvider() { super(); @@ -80,11 +83,16 @@ public class CoreModuleProvider extends ModuleProvider { } @Override public void prepare() throws ServiceNotProvidedException, ModuleStartException { + streamAnnotationListener = new StreamAnnotationListener(getManager()); + AnnotationScan scopeScan = new AnnotationScan(); scopeScan.registerListener(new DefaultScopeDefine.Listener()); try { scopeScan.scan(); - OALEngineLoader.get().start(getClass().getClassLoader()); + + oalEngine = OALEngineLoader.get(); + oalEngine.setStreamListener(streamAnnotationListener); + oalEngine.start(getClass().getClassLoader()); } catch (Exception e) { throw new ModuleStartException(e.getMessage(), e); } @@ -153,7 +161,7 @@ public class CoreModuleProvider extends ModuleProvider { this.registerServiceImplementation(AlarmQueryService.class, new AlarmQueryService(getManager())); this.registerServiceImplementation(TopNRecordsQueryService.class, new TopNRecordsQueryService(getManager())); - annotationScan.registerListener(new StreamAnnotationListener(getManager())); + annotationScan.registerListener(streamAnnotationListener); this.remoteClientManager = new RemoteClientManager(getManager()); this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager); @@ -166,8 +174,10 @@ public class CoreModuleProvider extends ModuleProvider { try { receiver.scan(); - annotationScan.scan(); + + oalEngine.notifyAllListeners(); + streamDataMapping.init(); } catch (IOException | IllegalAccessException | InstantiationException e) { throw new ModuleStartException(e.getMessage(), e); diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/boot/AgentPackagePath.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/WorkPath.java similarity index 60% copy from apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/boot/AgentPackagePath.java copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/WorkPath.java index a7574ef..d92d2ee 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/boot/AgentPackagePath.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/WorkPath.java @@ -16,38 +16,34 @@ * */ - -package org.apache.skywalking.apm.agent.core.boot; - -import java.net.URISyntaxException; -import org.apache.skywalking.apm.agent.core.logging.api.ILog; -import org.apache.skywalking.apm.agent.core.logging.api.LogManager; +package org.apache.skywalking.oap.server.core; import java.io.File; import java.net.MalformedURLException; +import java.net.URISyntaxException; import java.net.URL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** + * Locate the base work path of OAP backend. + * * @author wusheng */ -public class AgentPackagePath { - private static final ILog logger = LogManager.getLogger(AgentPackagePath.class); +public class WorkPath { + private static final Logger logger = LoggerFactory.getLogger(WorkPath.class); - private static File AGENT_PACKAGE_PATH; + private static File PATH; - public static File getPath() throws AgentPackageNotFoundException { - if (AGENT_PACKAGE_PATH == null) { - AGENT_PACKAGE_PATH = findPath(); + public static File getPath() { + if (PATH == null) { + PATH = findPath(); } - return AGENT_PACKAGE_PATH; - } - - public static boolean isPathFound() { - return AGENT_PACKAGE_PATH != null; + return PATH; } - private static File findPath() throws AgentPackageNotFoundException { - String classResourcePath = AgentPackagePath.class.getName().replaceAll("\\.", "/") + ".class"; + private static File findPath() { + String classResourcePath = WorkPath.class.getName().replaceAll("\\.", "/") + ".class"; URL resource = ClassLoader.getSystemClassLoader().getResource(classResourcePath); if (resource != null) { @@ -64,21 +60,20 @@ public class AgentPackagePath { try { agentJarFile = new File(new URL(urlString).toURI()); } catch (MalformedURLException e) { - logger.error(e, "Can not locate agent jar file by url:" + urlString); + throw new UnexpectedException("Can not locate oap core jar file by url:" + urlString, e); } catch (URISyntaxException e) { - logger.error(e, "Can not locate agent jar file by url:" + urlString); + throw new UnexpectedException("Can not locate oap core jar file by url:" + urlString, e); } if (agentJarFile.exists()) { return agentJarFile.getParentFile(); } } else { - String classLocation = urlString.substring(urlString.indexOf("file:"), urlString.length() - classResourcePath.length()); + int prefixLength = "file:".length(); + String classLocation = urlString.substring(prefixLength, urlString.length() - classResourcePath.length()); return new File(classLocation); } } - logger.error("Can not locate agent jar file."); - throw new AgentPackageNotFoundException("Can not locate agent jar file."); + throw new UnexpectedException("Can not locate oap core jar file by path:" + classResourcePath); } - } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java index f84ee9d..46e82f2 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java @@ -21,11 +21,16 @@ package org.apache.skywalking.oap.server.core.analysis; import com.google.common.collect.ImmutableSet; import com.google.common.reflect.ClassPath; import java.io.IOException; -import java.lang.reflect.*; -import java.util.*; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.skywalking.oap.server.core.UnexpectedException; import org.apache.skywalking.oap.server.core.source.Source; -import org.slf4j.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @author peng-yongsheng, wusheng @@ -75,39 +80,43 @@ public class DispatcherManager { for (ClassPath.ClassInfo classInfo : classes) { Class<?> aClass = classInfo.load(); - if (!aClass.isInterface() && SourceDispatcher.class.isAssignableFrom(aClass)) { - Type[] genericInterfaces = aClass.getGenericInterfaces(); - for (Type genericInterface : genericInterfaces) { - ParameterizedType anInterface = (ParameterizedType)genericInterface; - if (anInterface.getRawType().getTypeName().equals(SourceDispatcher.class.getName())) { - Type[] arguments = anInterface.getActualTypeArguments(); - - if (arguments.length != 1) { - throw new UnexpectedException("unexpected type argument number, class " + aClass.getName()); - } - Type argument = arguments[0]; + addIfAsSourceDispatcher(aClass); + } + } - Object source = ((Class)argument).newInstance(); + public void addIfAsSourceDispatcher(Class aClass) throws IllegalAccessException, InstantiationException { + if (!aClass.isInterface() && SourceDispatcher.class.isAssignableFrom(aClass)) { + Type[] genericInterfaces = aClass.getGenericInterfaces(); + for (Type genericInterface : genericInterfaces) { + ParameterizedType anInterface = (ParameterizedType)genericInterface; + if (anInterface.getRawType().getTypeName().equals(SourceDispatcher.class.getName())) { + Type[] arguments = anInterface.getActualTypeArguments(); - if (!Source.class.isAssignableFrom(source.getClass())) { - throw new UnexpectedException("unexpected type argument of class " + aClass.getName() + ", should be `org.apache.skywalking.oap.server.core.source.Source`. "); - } + if (arguments.length != 1) { + throw new UnexpectedException("unexpected type argument number, class " + aClass.getName()); + } + Type argument = arguments[0]; - Source dispatcherSource = (Source)source; - SourceDispatcher dispatcher = (SourceDispatcher)aClass.newInstance(); + Object source = ((Class)argument).newInstance(); - int scopeId = dispatcherSource.scope(); + if (!Source.class.isAssignableFrom(source.getClass())) { + throw new UnexpectedException("unexpected type argument of class " + aClass.getName() + ", should be `org.apache.skywalking.oap.server.core.source.Source`. "); + } - List<SourceDispatcher> dispatchers = this.dispatcherMap.get(scopeId); - if (dispatchers == null) { - dispatchers = new ArrayList<>(); - this.dispatcherMap.put(scopeId, dispatchers); - } + Source dispatcherSource = (Source)source; + SourceDispatcher dispatcher = (SourceDispatcher)aClass.newInstance(); - dispatchers.add(dispatcher); + int scopeId = dispatcherSource.scope(); - logger.info("Dispatcher {} is added into DefaultScopeDefine {}.", dispatcher.getClass().getName(), scopeId); + List<SourceDispatcher> dispatchers = this.dispatcherMap.get(scopeId); + if (dispatchers == null) { + dispatchers = new ArrayList<>(); + this.dispatcherMap.put(scopeId, dispatchers); } + + dispatchers.add(dispatcher); + + logger.info("Dispatcher {} is added into DefaultScopeDefine {}.", dispatcher.getClass().getName(), scopeId); } } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/oal/rt/OALEngine.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/oal/rt/OALEngine.java index 8d5703b..ae1a0dc 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/oal/rt/OALEngine.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/oal/rt/OALEngine.java @@ -18,6 +18,7 @@ package org.apache.skywalking.oap.server.core.oal.rt; +import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener; import org.apache.skywalking.oap.server.library.module.ModuleStartException; /** @@ -26,5 +27,9 @@ import org.apache.skywalking.oap.server.library.module.ModuleStartException; * @author wusheng */ public interface OALEngine { + void setStreamListener(StreamAnnotationListener listener) throws ModuleStartException; + void start(ClassLoader currentClassLoader) throws ModuleStartException, OALCompileException; + + void notifyAllListeners() throws ModuleStartException; }
