wu-sheng closed pull request #1961: Support ConnectionProxyInstance tracing
URL: https://github.com/apache/incubator-skywalking/pull/1961
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/CreateJdbcConnectionProxyInstanceInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/CreateJdbcConnectionProxyInstanceInterceptor.java
new file mode 100644
index 000000000..270740db7
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/CreateJdbcConnectionProxyInstanceInterceptor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql;
+
+import com.mysql.cj.api.jdbc.JdbcConnection;
+import com.mysql.cj.core.conf.url.ConnectionUrl;
+import com.mysql.cj.core.conf.url.HostInfo;
+import java.lang.reflect.Method;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.StaticMethodsAroundInterceptor;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import 
org.apache.skywalking.apm.plugin.jdbc.mysql.wrapper.JdbcConnectionWrapper;
+import org.apache.skywalking.apm.plugin.jdbc.trace.ConnectionInfo;
+
+public class CreateJdbcConnectionProxyInstanceInterceptor implements 
StaticMethodsAroundInterceptor {
+    @Override
+    public void beforeMethod(Class clazz, Method method, Object[] 
allArguments, Class<?>[] parameterTypes,
+        MethodInterceptResult result) {
+    }
+
+    @Override
+    public Object afterMethod(Class clazz, Method method, Object[] 
allArguments, Class<?>[] parameterTypes,
+        Object ret) {
+        ConnectionUrl connectionUrl = (ConnectionUrl)allArguments[0];
+
+        StringBuilder hosts = new StringBuilder();
+        for (HostInfo info : connectionUrl.getHostsList()) {
+            
hosts.append(info.getHost()).append(":").append(info.getPort()).append(",");
+        }
+        ConnectionInfo connectionInfo = new 
ConnectionInfo(ComponentsDefine.MYSQL_JDBC_DRIVER, "Mysql", hosts.toString(), 
connectionUrl.getDatabase());
+        return new JdbcConnectionWrapper((JdbcConnection)ret, connectionInfo);
+    }
+
+    @Override
+    public void handleMethodException(Class clazz, Method method, Object[] 
allArguments, Class<?>[] parameterTypes,
+        Throwable t) {
+
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/CreateLoadBalancedConnectionProxyInstanceInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/CreateLoadBalancedConnectionProxyInstanceInterceptor.java
new file mode 100644
index 000000000..e2056108a
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/CreateLoadBalancedConnectionProxyInstanceInterceptor.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql;
+
+import com.mysql.cj.api.jdbc.ha.LoadBalancedConnection;
+import com.mysql.cj.core.conf.url.ConnectionUrl;
+import com.mysql.cj.core.conf.url.HostInfo;
+import java.lang.reflect.Method;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.StaticMethodsAroundInterceptor;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import 
org.apache.skywalking.apm.plugin.jdbc.mysql.wrapper.LoadBalancedConnectionWrapper;
+import org.apache.skywalking.apm.plugin.jdbc.trace.ConnectionInfo;
+
+public class CreateLoadBalancedConnectionProxyInstanceInterceptor implements 
StaticMethodsAroundInterceptor {
+    @Override public void beforeMethod(Class clazz, Method method, Object[] 
allArguments, Class<?>[] parameterTypes,
+        MethodInterceptResult result) {
+
+    }
+
+    @Override public Object afterMethod(Class clazz, Method method, Object[] 
allArguments, Class<?>[] parameterTypes,
+        Object ret) {
+        ConnectionUrl connectionUrl = (ConnectionUrl)allArguments[0];
+
+        StringBuilder hosts = new StringBuilder();
+        for (HostInfo info : connectionUrl.getHostsList()) {
+            
hosts.append(info.getHost()).append(":").append(info.getPort()).append(",");
+        }
+        ConnectionInfo connectionInfo = new 
ConnectionInfo(ComponentsDefine.MYSQL_JDBC_DRIVER, "Mysql", hosts.toString(), 
connectionUrl.getDatabase());
+        return new LoadBalancedConnectionWrapper((LoadBalancedConnection)ret, 
connectionInfo);
+    }
+
+    @Override
+    public void handleMethodException(Class clazz, Method method, Object[] 
allArguments, Class<?>[] parameterTypes,
+        Throwable t) {
+
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/CreateReplicationConnectionProxyInstanceInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/CreateReplicationConnectionProxyInstanceInterceptor.java
new file mode 100644
index 000000000..d76edf316
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/CreateReplicationConnectionProxyInstanceInterceptor.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql;
+
+import com.mysql.cj.api.jdbc.ha.ReplicationConnection;
+import com.mysql.cj.core.conf.url.ConnectionUrl;
+import com.mysql.cj.core.conf.url.HostInfo;
+import java.lang.reflect.Method;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.StaticMethodsAroundInterceptor;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import 
org.apache.skywalking.apm.plugin.jdbc.mysql.wrapper.ReplicationConnectionWrapper;
+import org.apache.skywalking.apm.plugin.jdbc.trace.ConnectionInfo;
+
+public class CreateReplicationConnectionProxyInstanceInterceptor implements 
StaticMethodsAroundInterceptor {
+    @Override public void beforeMethod(Class clazz, Method method, Object[] 
allArguments, Class<?>[] parameterTypes,
+        MethodInterceptResult result) {
+
+    }
+
+    @Override public Object afterMethod(Class clazz, Method method, Object[] 
allArguments, Class<?>[] parameterTypes,
+        Object ret) {
+        ConnectionUrl connectionUrl = (ConnectionUrl)allArguments[0];
+
+        StringBuilder hosts = new StringBuilder();
+        for (HostInfo info : connectionUrl.getHostsList()) {
+            
hosts.append(info.getHost()).append(":").append(info.getPort()).append(",");
+        }
+        ConnectionInfo connectionInfo = new 
ConnectionInfo(ComponentsDefine.MYSQL_JDBC_DRIVER, "Mysql", hosts.toString(), 
connectionUrl.getDatabase());
+        return new ReplicationConnectionWrapper((ReplicationConnection)ret, 
connectionInfo);
+    }
+
+    @Override
+    public void handleMethodException(Class clazz, Method method, Object[] 
allArguments, Class<?>[] parameterTypes,
+        Throwable t) {
+
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/define/FailoverConnectionProxyInstrumentation.java
 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/define/FailoverConnectionProxyInstrumentation.java
new file mode 100644
index 000000000..e71897c71
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/define/FailoverConnectionProxyInstrumentation.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassStaticMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static 
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+public class FailoverConnectionProxyInstrumentation extends 
ClassStaticMethodsEnhancePluginDefine {
+
+    public static final String METHOD_INTERCEPTOR = 
"org.apache.skywalking.apm.plugin.jdbc.mysql.CreateJdbcConnectionProxyInstanceInterceptor";
+    public static final String INTERCEPT_CLASS = 
"com.mysql.cj.jdbc.ha.FailoverConnectionProxy";
+
+    @Override
+    protected StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {
+        return new StaticMethodsInterceptPoint[] {
+            new StaticMethodsInterceptPoint() {
+                @Override public ElementMatcher<MethodDescription> 
getMethodsMatcher() {
+                    return named("createProxyInstance");
+                }
+
+                @Override public String getMethodsInterceptor() {
+                    return METHOD_INTERCEPTOR;
+                }
+
+                @Override public boolean isOverrideArgs() {
+                    return false;
+                }
+            }
+        };
+    }
+
+    @Override
+    protected ClassMatch enhanceClass() {
+        return byName(INTERCEPT_CLASS);
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/define/LoadBalancedConnectionProxyInstrumentation.java
 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/define/LoadBalancedConnectionProxyInstrumentation.java
new file mode 100644
index 000000000..a423f024c
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/define/LoadBalancedConnectionProxyInstrumentation.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassStaticMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static 
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+public class LoadBalancedConnectionProxyInstrumentation extends 
ClassStaticMethodsEnhancePluginDefine {
+
+    public static final String METHOD_INTERCEPTOR = 
"org.apache.skywalking.apm.plugin.jdbc.mysql.CreateLoadBalancedConnectionProxyInstanceInterceptor";
+    public static final String INTERCEPT_CLASS = 
"com.mysql.cj.jdbc.ha.LoadBalancedConnectionProxy";
+
+    @Override protected StaticMethodsInterceptPoint[] 
getStaticMethodsInterceptPoints() {
+        return new StaticMethodsInterceptPoint[] {
+            new StaticMethodsInterceptPoint() {
+                @Override public ElementMatcher<MethodDescription> 
getMethodsMatcher() {
+                    return named("createProxyInstance");
+                }
+
+                @Override public String getMethodsInterceptor() {
+                    return METHOD_INTERCEPTOR;
+                }
+
+                @Override public boolean isOverrideArgs() {
+                    return false;
+                }
+            }
+        };
+    }
+
+    @Override protected ClassMatch enhanceClass() {
+        return byName(INTERCEPT_CLASS);
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/define/ReplicationConnectionProxyInstrumentation.java
 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/define/ReplicationConnectionProxyInstrumentation.java
new file mode 100644
index 000000000..fa96748e0
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/define/ReplicationConnectionProxyInstrumentation.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassStaticMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static 
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+public class ReplicationConnectionProxyInstrumentation extends 
ClassStaticMethodsEnhancePluginDefine {
+
+    public static final String METHOD_INTERCEPTOR = 
"org.apache.skywalking.apm.plugin.jdbc.mysql.CreateReplicationConnectionProxyInstanceInterceptor";
+    public static final String INTERCEPT_CLASS = 
"com.mysql.cj.jdbc.ha.ReplicationConnectionProxy";
+
+    @Override protected StaticMethodsInterceptPoint[] 
getStaticMethodsInterceptPoints() {
+        return new StaticMethodsInterceptPoint[] {
+            new StaticMethodsInterceptPoint() {
+                @Override public ElementMatcher<MethodDescription> 
getMethodsMatcher() {
+                    return named("createProxyInstance");
+                }
+
+                @Override public String getMethodsInterceptor() {
+                    return METHOD_INTERCEPTOR;
+                }
+
+                @Override public boolean isOverrideArgs() {
+                    return false;
+                }
+            }
+        };
+    }
+
+    @Override protected ClassMatch enhanceClass() {
+        return byName(INTERCEPT_CLASS);
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/CallableStatementWrapper.java
 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/CallableStatementWrapper.java
new file mode 100644
index 000000000..e76ac5b82
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/CallableStatementWrapper.java
@@ -0,0 +1,539 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql.wrapper;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Date;
+import java.sql.NClob;
+import java.sql.Ref;
+import java.sql.RowId;
+import java.sql.SQLException;
+import java.sql.SQLType;
+import java.sql.SQLXML;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Calendar;
+import java.util.Map;
+import org.apache.skywalking.apm.plugin.jdbc.trace.ConnectionInfo;
+
+public class CallableStatementWrapper extends PreparedStatementWrapper 
implements CallableStatement {
+
+    @Override public void registerOutParameter(int parameterIndex, int 
sqlType) throws SQLException {
+        call.registerOutParameter(parameterIndex, sqlType);
+    }
+
+    @Override public void registerOutParameter(int parameterIndex, int 
sqlType, int scale) throws SQLException {
+        call.registerOutParameter(parameterIndex, sqlType, scale);
+    }
+
+    @Override public boolean wasNull() throws SQLException {
+        return call.wasNull();
+    }
+
+    @Override public String getString(int parameterIndex) throws SQLException {
+        return call.getString(parameterIndex);
+    }
+
+    @Override public boolean getBoolean(int parameterIndex) throws 
SQLException {
+        return call.getBoolean(parameterIndex);
+    }
+
+    @Override public byte getByte(int parameterIndex) throws SQLException {
+        return call.getByte(parameterIndex);
+    }
+
+    @Override public short getShort(int parameterIndex) throws SQLException {
+        return call.getShort(parameterIndex);
+    }
+
+    @Override public int getInt(int parameterIndex) throws SQLException {
+        return call.getInt(parameterIndex);
+    }
+
+    @Override public long getLong(int parameterIndex) throws SQLException {
+        return call.getLong(parameterIndex);
+    }
+
+    @Override public float getFloat(int parameterIndex) throws SQLException {
+        return call.getFloat(parameterIndex);
+    }
+
+    @Override public double getDouble(int parameterIndex) throws SQLException {
+        return call.getDouble(parameterIndex);
+    }
+
+    @Override @Deprecated public BigDecimal getBigDecimal(int parameterIndex, 
int scale) throws SQLException {
+        return call.getBigDecimal(parameterIndex, scale);
+    }
+
+    @Override public byte[] getBytes(int parameterIndex) throws SQLException {
+        return call.getBytes(parameterIndex);
+    }
+
+    @Override public Date getDate(int parameterIndex) throws SQLException {
+        return call.getDate(parameterIndex);
+    }
+
+    @Override public Time getTime(int parameterIndex) throws SQLException {
+        return call.getTime(parameterIndex);
+    }
+
+    @Override public Timestamp getTimestamp(int parameterIndex) throws 
SQLException {
+        return call.getTimestamp(parameterIndex);
+    }
+
+    @Override public Object getObject(int parameterIndex) throws SQLException {
+        return call.getObject(parameterIndex);
+    }
+
+    @Override public BigDecimal getBigDecimal(int parameterIndex) throws 
SQLException {
+        return call.getBigDecimal(parameterIndex);
+    }
+
+    @Override public Object getObject(int parameterIndex, Map<String, 
Class<?>> map) throws SQLException {
+        return call.getObject(parameterIndex, map);
+    }
+
+    @Override public Ref getRef(int parameterIndex) throws SQLException {
+        return call.getRef(parameterIndex);
+    }
+
+    @Override public Blob getBlob(int parameterIndex) throws SQLException {
+        return call.getBlob(parameterIndex);
+    }
+
+    @Override public Clob getClob(int parameterIndex) throws SQLException {
+        return call.getClob(parameterIndex);
+    }
+
+    @Override public Array getArray(int parameterIndex) throws SQLException {
+        return call.getArray(parameterIndex);
+    }
+
+    @Override public Date getDate(int parameterIndex, Calendar cal) throws 
SQLException {
+        return call.getDate(parameterIndex, cal);
+    }
+
+    @Override public Time getTime(int parameterIndex, Calendar cal) throws 
SQLException {
+        return call.getTime(parameterIndex, cal);
+    }
+
+    @Override public Timestamp getTimestamp(int parameterIndex, Calendar cal) 
throws SQLException {
+        return call.getTimestamp(parameterIndex, cal);
+    }
+
+    @Override public void registerOutParameter(int parameterIndex, int 
sqlType, String typeName) throws SQLException {
+        call.registerOutParameter(parameterIndex, sqlType, typeName);
+    }
+
+    @Override public void registerOutParameter(String parameterName, int 
sqlType) throws SQLException {
+        call.registerOutParameter(parameterName, sqlType);
+    }
+
+    @Override public void registerOutParameter(String parameterName, int 
sqlType, int scale) throws SQLException {
+        call.registerOutParameter(parameterName, sqlType, scale);
+    }
+
+    @Override public void registerOutParameter(String parameterName, int 
sqlType, String typeName) throws SQLException {
+        call.registerOutParameter(parameterName, sqlType, typeName);
+    }
+
+    @Override public URL getURL(int parameterIndex) throws SQLException {
+        return call.getURL(parameterIndex);
+    }
+
+    @Override public void setURL(String parameterName, URL val) throws 
SQLException {
+        call.setURL(parameterName, val);
+    }
+
+    @Override public void setNull(String parameterName, int sqlType) throws 
SQLException {
+        call.setNull(parameterName, sqlType);
+    }
+
+    @Override public void setBoolean(String parameterName, boolean x) throws 
SQLException {
+        call.setBoolean(parameterName, x);
+    }
+
+    @Override public void setByte(String parameterName, byte x) throws 
SQLException {
+        call.setByte(parameterName, x);
+    }
+
+    @Override public void setShort(String parameterName, short x) throws 
SQLException {
+        call.setShort(parameterName, x);
+    }
+
+    @Override public void setInt(String parameterName, int x) throws 
SQLException {
+        call.setInt(parameterName, x);
+    }
+
+    @Override public void setLong(String parameterName, long x) throws 
SQLException {
+        call.setLong(parameterName, x);
+    }
+
+    @Override public void setFloat(String parameterName, float x) throws 
SQLException {
+        call.setFloat(parameterName, x);
+    }
+
+    @Override public void setDouble(String parameterName, double x) throws 
SQLException {
+        call.setDouble(parameterName, x);
+    }
+
+    @Override public void setBigDecimal(String parameterName, BigDecimal x) 
throws SQLException {
+        call.setBigDecimal(parameterName, x);
+    }
+
+    @Override public void setString(String parameterName, String x) throws 
SQLException {
+        call.setString(parameterName, x);
+    }
+
+    @Override public void setBytes(String parameterName, byte[] x) throws 
SQLException {
+        call.setBytes(parameterName, x);
+    }
+
+    @Override public void setDate(String parameterName, Date x) throws 
SQLException {
+        call.setDate(parameterName, x);
+    }
+
+    @Override public void setTime(String parameterName, Time x) throws 
SQLException {
+        call.setTime(parameterName, x);
+    }
+
+    @Override public void setTimestamp(String parameterName, Timestamp x) 
throws SQLException {
+        call.setTimestamp(parameterName, x);
+    }
+
+    @Override public void setAsciiStream(String parameterName, InputStream x, 
int length) throws SQLException {
+        call.setAsciiStream(parameterName, x, length);
+    }
+
+    @Override public void setBinaryStream(String parameterName, InputStream x, 
int length) throws SQLException {
+        call.setBinaryStream(parameterName, x, length);
+    }
+
+    @Override public void setObject(String parameterName, Object x, int 
targetSqlType, int scale) throws SQLException {
+        call.setObject(parameterName, x, targetSqlType, scale);
+    }
+
+    @Override public void setObject(String parameterName, Object x, int 
targetSqlType) throws SQLException {
+        call.setObject(parameterName, x, targetSqlType);
+    }
+
+    @Override public void setObject(String parameterName, Object x) throws 
SQLException {
+        call.setObject(parameterName, x);
+    }
+
+    @Override public void setCharacterStream(String parameterName, Reader 
reader, int length) throws SQLException {
+        call.setCharacterStream(parameterName, reader, length);
+    }
+
+    @Override public void setDate(String parameterName, Date x, Calendar cal) 
throws SQLException {
+        call.setDate(parameterName, x, cal);
+    }
+
+    @Override public void setTime(String parameterName, Time x, Calendar cal) 
throws SQLException {
+        call.setTime(parameterName, x, cal);
+    }
+
+    @Override public void setTimestamp(String parameterName, Timestamp x, 
Calendar cal) throws SQLException {
+        call.setTimestamp(parameterName, x, cal);
+    }
+
+    @Override public void setNull(String parameterName, int sqlType, String 
typeName) throws SQLException {
+        call.setNull(parameterName, sqlType, typeName);
+    }
+
+    @Override public String getString(String parameterName) throws 
SQLException {
+        return call.getString(parameterName);
+    }
+
+    @Override public boolean getBoolean(String parameterName) throws 
SQLException {
+        return call.getBoolean(parameterName);
+    }
+
+    @Override public byte getByte(String parameterName) throws SQLException {
+        return call.getByte(parameterName);
+    }
+
+    @Override public short getShort(String parameterName) throws SQLException {
+        return call.getShort(parameterName);
+    }
+
+    @Override public int getInt(String parameterName) throws SQLException {
+        return call.getInt(parameterName);
+    }
+
+    @Override public long getLong(String parameterName) throws SQLException {
+        return call.getLong(parameterName);
+    }
+
+    @Override public float getFloat(String parameterName) throws SQLException {
+        return call.getFloat(parameterName);
+    }
+
+    @Override public double getDouble(String parameterName) throws 
SQLException {
+        return call.getDouble(parameterName);
+    }
+
+    @Override public byte[] getBytes(String parameterName) throws SQLException 
{
+        return call.getBytes(parameterName);
+    }
+
+    @Override public Date getDate(String parameterName) throws SQLException {
+        return call.getDate(parameterName);
+    }
+
+    @Override public Time getTime(String parameterName) throws SQLException {
+        return call.getTime(parameterName);
+    }
+
+    @Override public Timestamp getTimestamp(String parameterName) throws 
SQLException {
+        return call.getTimestamp(parameterName);
+    }
+
+    @Override public Object getObject(String parameterName) throws 
SQLException {
+        return call.getObject(parameterName);
+    }
+
+    @Override public BigDecimal getBigDecimal(String parameterName) throws 
SQLException {
+        return call.getBigDecimal(parameterName);
+    }
+
+    @Override public Object getObject(String parameterName, Map<String, 
Class<?>> map) throws SQLException {
+        return call.getObject(parameterName, map);
+    }
+
+    @Override public Ref getRef(String parameterName) throws SQLException {
+        return call.getRef(parameterName);
+    }
+
+    @Override public Blob getBlob(String parameterName) throws SQLException {
+        return call.getBlob(parameterName);
+    }
+
+    @Override public Clob getClob(String parameterName) throws SQLException {
+        return call.getClob(parameterName);
+    }
+
+    @Override public Array getArray(String parameterName) throws SQLException {
+        return call.getArray(parameterName);
+    }
+
+    @Override public Date getDate(String parameterName, Calendar cal) throws 
SQLException {
+        return call.getDate(parameterName, cal);
+    }
+
+    @Override public Time getTime(String parameterName, Calendar cal) throws 
SQLException {
+        return call.getTime(parameterName, cal);
+    }
+
+    @Override public Timestamp getTimestamp(String parameterName, Calendar 
cal) throws SQLException {
+        return call.getTimestamp(parameterName, cal);
+    }
+
+    @Override public URL getURL(String parameterName) throws SQLException {
+        return call.getURL(parameterName);
+    }
+
+    @Override public RowId getRowId(int parameterIndex) throws SQLException {
+        return call.getRowId(parameterIndex);
+    }
+
+    @Override public RowId getRowId(String parameterName) throws SQLException {
+        return call.getRowId(parameterName);
+    }
+
+    @Override public void setRowId(String parameterName, RowId x) throws 
SQLException {
+        call.setRowId(parameterName, x);
+    }
+
+    @Override public void setNString(String parameterName, String value) 
throws SQLException {
+        call.setNString(parameterName, value);
+    }
+
+    @Override public void setNCharacterStream(String parameterName, Reader 
value, long length) throws SQLException {
+        call.setNCharacterStream(parameterName, value, length);
+    }
+
+    @Override public void setNClob(String parameterName, NClob value) throws 
SQLException {
+        call.setNClob(parameterName, value);
+    }
+
+    @Override public void setClob(String parameterName, Reader reader, long 
length) throws SQLException {
+        call.setClob(parameterName, reader, length);
+    }
+
+    @Override public void setBlob(String parameterName, InputStream 
inputStream, long length) throws SQLException {
+        call.setBlob(parameterName, inputStream, length);
+    }
+
+    @Override public void setNClob(String parameterName, Reader reader, long 
length) throws SQLException {
+        call.setNClob(parameterName, reader, length);
+    }
+
+    @Override public NClob getNClob(int parameterIndex) throws SQLException {
+        return call.getNClob(parameterIndex);
+    }
+
+    @Override public NClob getNClob(String parameterName) throws SQLException {
+        return call.getNClob(parameterName);
+    }
+
+    @Override public void setSQLXML(String parameterName, SQLXML xmlObject) 
throws SQLException {
+        call.setSQLXML(parameterName, xmlObject);
+    }
+
+    @Override public SQLXML getSQLXML(int parameterIndex) throws SQLException {
+        return call.getSQLXML(parameterIndex);
+    }
+
+    @Override public SQLXML getSQLXML(String parameterName) throws 
SQLException {
+        return call.getSQLXML(parameterName);
+    }
+
+    @Override public String getNString(int parameterIndex) throws SQLException 
{
+        return call.getNString(parameterIndex);
+    }
+
+    @Override public String getNString(String parameterName) throws 
SQLException {
+        return call.getNString(parameterName);
+    }
+
+    @Override public Reader getNCharacterStream(int parameterIndex) throws 
SQLException {
+        return call.getNCharacterStream(parameterIndex);
+    }
+
+    @Override public Reader getNCharacterStream(String parameterName) throws 
SQLException {
+        return call.getNCharacterStream(parameterName);
+    }
+
+    @Override public Reader getCharacterStream(int parameterIndex) throws 
SQLException {
+        return call.getCharacterStream(parameterIndex);
+    }
+
+    @Override public Reader getCharacterStream(String parameterName) throws 
SQLException {
+        return call.getCharacterStream(parameterName);
+    }
+
+    @Override public void setBlob(String parameterName, Blob x) throws 
SQLException {
+        call.setBlob(parameterName, x);
+    }
+
+    @Override public void setClob(String parameterName, Clob x) throws 
SQLException {
+        call.setClob(parameterName, x);
+    }
+
+    @Override public void setAsciiStream(String parameterName, InputStream x, 
long length) throws SQLException {
+        call.setAsciiStream(parameterName, x, length);
+    }
+
+    @Override public void setBinaryStream(String parameterName, InputStream x, 
long length) throws SQLException {
+        call.setBinaryStream(parameterName, x, length);
+    }
+
+    @Override public void setCharacterStream(String parameterName, Reader 
reader, long length) throws SQLException {
+        call.setCharacterStream(parameterName, reader, length);
+    }
+
+    @Override public void setAsciiStream(String parameterName, InputStream x) 
throws SQLException {
+        call.setAsciiStream(parameterName, x);
+    }
+
+    @Override public void setBinaryStream(String parameterName, InputStream x) 
throws SQLException {
+        call.setBinaryStream(parameterName, x);
+    }
+
+    @Override public void setCharacterStream(String parameterName, Reader 
reader) throws SQLException {
+        call.setCharacterStream(parameterName, reader);
+    }
+
+    @Override public void setNCharacterStream(String parameterName, Reader 
value) throws SQLException {
+        call.setNCharacterStream(parameterName, value);
+    }
+
+    @Override public void setClob(String parameterName, Reader reader) throws 
SQLException {
+        call.setClob(parameterName, reader);
+    }
+
+    @Override public void setBlob(String parameterName, InputStream 
inputStream) throws SQLException {
+        call.setBlob(parameterName, inputStream);
+    }
+
+    @Override public void setNClob(String parameterName, Reader reader) throws 
SQLException {
+        call.setNClob(parameterName, reader);
+    }
+
+    @Override public <T> T getObject(int parameterIndex, Class<T> type) throws 
SQLException {
+        return call.getObject(parameterIndex, type);
+    }
+
+    @Override public <T> T getObject(String parameterName, Class<T> type) 
throws SQLException {
+        return call.getObject(parameterName, type);
+    }
+
+    @Override public void setObject(String parameterName, Object x, SQLType 
targetSqlType,
+        int scaleOrLength) throws SQLException {
+        call.setObject(parameterName, x, targetSqlType, scaleOrLength);
+    }
+
+    @Override public void setObject(String parameterName, Object x, SQLType 
targetSqlType) throws SQLException {
+        call.setObject(parameterName, x, targetSqlType);
+    }
+
+    @Override public void registerOutParameter(int parameterIndex, SQLType 
sqlType) throws SQLException {
+        call.registerOutParameter(parameterIndex, sqlType);
+    }
+
+    @Override public void registerOutParameter(int parameterIndex, SQLType 
sqlType, int scale) throws SQLException {
+        call.registerOutParameter(parameterIndex, sqlType, scale);
+    }
+
+    @Override
+    public void registerOutParameter(int parameterIndex, SQLType sqlType, 
String typeName) throws SQLException {
+        call.registerOutParameter(parameterIndex, sqlType, typeName);
+    }
+
+    @Override public void registerOutParameter(String parameterName, SQLType 
sqlType) throws SQLException {
+        call.registerOutParameter(parameterName, sqlType);
+    }
+
+    @Override public void registerOutParameter(String parameterName, SQLType 
sqlType, int scale) throws SQLException {
+        call.registerOutParameter(parameterName, sqlType, scale);
+    }
+
+    @Override
+    public void registerOutParameter(String parameterName, SQLType sqlType, 
String typeName) throws SQLException {
+        call.registerOutParameter(parameterName, sqlType, typeName);
+    }
+
+    private final CallableStatement call;
+    private final String sql;
+
+    public CallableStatementWrapper(CallableStatement call, ConnectionInfo 
connectionInfo, String sql) {
+        super(call, connectionInfo, sql, "Callable");
+        this.call = call;
+        this.sql = sql;
+    }
+
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/JdbcConnectionWrapper.java
 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/JdbcConnectionWrapper.java
new file mode 100644
index 000000000..5b2ab6486
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/JdbcConnectionWrapper.java
@@ -0,0 +1,652 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql.wrapper;
+
+import com.mysql.cj.api.exceptions.ExceptionInterceptor;
+import com.mysql.cj.api.jdbc.ClientInfoProvider;
+import com.mysql.cj.api.jdbc.JdbcConnection;
+import com.mysql.cj.api.jdbc.JdbcPropertySet;
+import com.mysql.cj.api.jdbc.interceptors.StatementInterceptor;
+import com.mysql.cj.api.jdbc.result.ResultSetInternalMethods;
+import com.mysql.cj.api.mysqla.io.PacketPayload;
+import com.mysql.cj.api.mysqla.result.ColumnDefinition;
+import com.mysql.cj.core.ServerVersion;
+import com.mysql.cj.jdbc.ServerPreparedStatement;
+import com.mysql.cj.jdbc.StatementImpl;
+import com.mysql.cj.jdbc.result.CachedResultSetMetaData;
+import com.mysql.cj.mysqla.MysqlaSession;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.DatabaseMetaData;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Timer;
+import java.util.concurrent.Executor;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.plugin.jdbc.trace.ConnectionInfo;
+
+public class JdbcConnectionWrapper implements JdbcConnection, EnhancedInstance 
{
+
+    public JdbcConnectionWrapper(JdbcConnection delegate, ConnectionInfo 
connectionInfo) {
+        this.delegate = delegate;
+        this.connectionInfo = connectionInfo;
+    }
+
+    public JdbcPropertySet getPropertySet() {
+        return delegate.getPropertySet();
+    }
+
+    public MysqlaSession getSession() {
+        return delegate.getSession();
+    }
+
+    public void changeUser(String s, String s1) throws SQLException {
+        delegate.changeUser(s, s1);
+    }
+
+    @Deprecated public void clearHasTriedMaster() {
+        delegate.clearHasTriedMaster();
+    }
+
+    public PreparedStatement clientPrepareStatement(String s) throws 
SQLException {
+        return delegate.clientPrepareStatement(s);
+    }
+
+    public PreparedStatement clientPrepareStatement(String s, int i) throws 
SQLException {
+        return delegate.clientPrepareStatement(s, i);
+    }
+
+    public PreparedStatement clientPrepareStatement(String s, int i, int i1) 
throws SQLException {
+        return delegate.clientPrepareStatement(s, i, i1);
+    }
+
+    public PreparedStatement clientPrepareStatement(String s, int[] ints) 
throws SQLException {
+        return delegate.clientPrepareStatement(s, ints);
+    }
+
+    public PreparedStatement clientPrepareStatement(String s, int i, int i1, 
int i2) throws SQLException {
+        return delegate.clientPrepareStatement(s, i, i1, i2);
+    }
+
+    public PreparedStatement clientPrepareStatement(String s, String[] 
strings) throws SQLException {
+        return delegate.clientPrepareStatement(s, strings);
+    }
+
+    public int getActiveStatementCount() {
+        return delegate.getActiveStatementCount();
+    }
+
+    public long getIdleFor() {
+        return delegate.getIdleFor();
+    }
+
+    public String getStatementComment() {
+        return delegate.getStatementComment();
+    }
+
+    @Deprecated public boolean hasTriedMaster() {
+        return delegate.hasTriedMaster();
+    }
+
+    public boolean isInGlobalTx() {
+        return delegate.isInGlobalTx();
+    }
+
+    public void setInGlobalTx(boolean b) {
+        delegate.setInGlobalTx(b);
+    }
+
+    public boolean isMasterConnection() {
+        return delegate.isMasterConnection();
+    }
+
+    public boolean isNoBackslashEscapesSet() {
+        return delegate.isNoBackslashEscapesSet();
+    }
+
+    public boolean isSameResource(JdbcConnection connection) {
+        return delegate.isSameResource(connection);
+    }
+
+    public boolean lowerCaseTableNames() {
+        return delegate.lowerCaseTableNames();
+    }
+
+    public void ping() throws SQLException {
+        delegate.ping();
+    }
+
+    public void resetServerState() throws SQLException {
+        delegate.resetServerState();
+    }
+
+    public PreparedStatement serverPrepareStatement(String s) throws 
SQLException {
+        return delegate.serverPrepareStatement(s);
+    }
+
+    public PreparedStatement serverPrepareStatement(String s, int i) throws 
SQLException {
+        return delegate.serverPrepareStatement(s, i);
+    }
+
+    public PreparedStatement serverPrepareStatement(String s, int i, int i1) 
throws SQLException {
+        return delegate.serverPrepareStatement(s, i, i1);
+    }
+
+    public PreparedStatement serverPrepareStatement(String s, int i, int i1, 
int i2) throws SQLException {
+        return delegate.serverPrepareStatement(s, i, i1, i2);
+    }
+
+    public PreparedStatement serverPrepareStatement(String s, int[] ints) 
throws SQLException {
+        return delegate.serverPrepareStatement(s, ints);
+    }
+
+    public PreparedStatement serverPrepareStatement(String s, String[] 
strings) throws SQLException {
+        return delegate.serverPrepareStatement(s, strings);
+    }
+
+    public void setFailedOver(boolean b) {
+        delegate.setFailedOver(b);
+    }
+
+    public void setStatementComment(String s) {
+        delegate.setStatementComment(s);
+    }
+
+    public void shutdownServer() throws SQLException {
+        delegate.shutdownServer();
+    }
+
+    public void reportQueryTime(long l) {
+        delegate.reportQueryTime(l);
+    }
+
+    public boolean isAbonormallyLongQuery(long l) {
+        return delegate.isAbonormallyLongQuery(l);
+    }
+
+    public int getAutoIncrementIncrement() {
+        return delegate.getAutoIncrementIncrement();
+    }
+
+    public boolean hasSameProperties(JdbcConnection connection) {
+        return delegate.hasSameProperties(connection);
+    }
+
+    public String getHost() {
+        return delegate.getHost();
+    }
+
+    public String getHostPortPair() {
+        return delegate.getHostPortPair();
+    }
+
+    public void setProxy(JdbcConnection connection) {
+        delegate.setProxy(connection);
+    }
+
+    public boolean isServerLocal() throws SQLException {
+        return delegate.isServerLocal();
+    }
+
+    public int getSessionMaxRows() {
+        return delegate.getSessionMaxRows();
+    }
+
+    public void setSessionMaxRows(int i) throws SQLException {
+        delegate.setSessionMaxRows(i);
+    }
+
+    public void setSchema(String s) throws SQLException {
+        delegate.setSchema(s);
+    }
+
+    public void abortInternal() throws SQLException {
+        delegate.abortInternal();
+    }
+
+    public void checkClosed() {
+        delegate.checkClosed();
+    }
+
+    public boolean isProxySet() {
+        return delegate.isProxySet();
+    }
+
+    public JdbcConnection duplicate() throws SQLException {
+        return delegate.duplicate();
+    }
+
+    public ResultSetInternalMethods execSQL(StatementImpl statement,
+        String s, int i, PacketPayload payload, boolean b, String s1,
+        ColumnDefinition definition) throws SQLException {
+        return delegate.execSQL(statement, s, i, payload, b, s1, definition);
+    }
+
+    public ResultSetInternalMethods execSQL(StatementImpl statement,
+        String s, int i, PacketPayload payload, boolean b, String s1,
+        ColumnDefinition definition, boolean b1) throws SQLException {
+        return delegate.execSQL(statement, s, i, payload, b, s1, definition, 
b1);
+    }
+
+    public StringBuilder generateConnectionCommentBlock(StringBuilder builder) 
{
+        return delegate.generateConnectionCommentBlock(builder);
+    }
+
+    public CachedResultSetMetaData getCachedMetaData(String s) {
+        return delegate.getCachedMetaData(s);
+    }
+
+    public Timer getCancelTimer() {
+        return delegate.getCancelTimer();
+    }
+
+    public String getCharacterSetMetadata() {
+        return delegate.getCharacterSetMetadata();
+    }
+
+    public Statement getMetadataSafeStatement() throws SQLException {
+        return delegate.getMetadataSafeStatement();
+    }
+
+    public boolean getRequiresEscapingEncoder() {
+        return delegate.getRequiresEscapingEncoder();
+    }
+
+    public ServerVersion getServerVersion() {
+        return delegate.getServerVersion();
+    }
+
+    public List<StatementInterceptor> getStatementInterceptorsInstances() {
+        return delegate.getStatementInterceptorsInstances();
+    }
+
+    public void incrementNumberOfPreparedExecutes() {
+        delegate.incrementNumberOfPreparedExecutes();
+    }
+
+    public void incrementNumberOfPrepares() {
+        delegate.incrementNumberOfPrepares();
+    }
+
+    public void incrementNumberOfResultSetsCreated() {
+        delegate.incrementNumberOfResultSetsCreated();
+    }
+
+    public void initializeResultsMetadataFromCache(String s, 
CachedResultSetMetaData data,
+        ResultSetInternalMethods methods) throws SQLException {
+        delegate.initializeResultsMetadataFromCache(s, data, methods);
+    }
+
+    public void initializeSafeStatementInterceptors() throws SQLException {
+        delegate.initializeSafeStatementInterceptors();
+    }
+
+    public boolean isReadInfoMsgEnabled() {
+        return delegate.isReadInfoMsgEnabled();
+    }
+
+    public boolean isReadOnly(boolean b) throws SQLException {
+        return delegate.isReadOnly(b);
+    }
+
+    public void pingInternal(boolean b, int i) throws SQLException {
+        delegate.pingInternal(b, i);
+    }
+
+    public void realClose(boolean b, boolean b1, boolean b2, Throwable 
throwable) throws SQLException {
+        delegate.realClose(b, b1, b2, throwable);
+    }
+
+    public void recachePreparedStatement(ServerPreparedStatement statement) 
throws SQLException {
+        delegate.recachePreparedStatement(statement);
+    }
+
+    public void decachePreparedStatement(ServerPreparedStatement statement) 
throws SQLException {
+        delegate.decachePreparedStatement(statement);
+    }
+
+    public void registerQueryExecutionTime(long l) {
+        delegate.registerQueryExecutionTime(l);
+    }
+
+    public void registerStatement(com.mysql.cj.api.jdbc.Statement statement) {
+        delegate.registerStatement(statement);
+    }
+
+    public void reportNumberOfTablesAccessed(int i) {
+        delegate.reportNumberOfTablesAccessed(i);
+    }
+
+    public void setReadInfoMsgEnabled(boolean b) {
+        delegate.setReadInfoMsgEnabled(b);
+    }
+
+    public void setReadOnlyInternal(boolean b) throws SQLException {
+        delegate.setReadOnlyInternal(b);
+    }
+
+    public boolean storesLowerCaseTableName() {
+        return delegate.storesLowerCaseTableName();
+    }
+
+    public void throwConnectionClosedException() throws SQLException {
+        delegate.throwConnectionClosedException();
+    }
+
+    public void transactionBegun() throws SQLException {
+        delegate.transactionBegun();
+    }
+
+    public void transactionCompleted() throws SQLException {
+        delegate.transactionCompleted();
+    }
+
+    public void unregisterStatement(com.mysql.cj.api.jdbc.Statement statement) 
{
+        delegate.unregisterStatement(statement);
+    }
+
+    public void unSafeStatementInterceptors() throws SQLException {
+        delegate.unSafeStatementInterceptors();
+    }
+
+    public boolean useAnsiQuotedIdentifiers() {
+        return delegate.useAnsiQuotedIdentifiers();
+    }
+
+    public JdbcConnection getMultiHostSafeProxy() {
+        return delegate.getMultiHostSafeProxy();
+    }
+
+    public ClientInfoProvider getClientInfoProviderImpl() throws SQLException {
+        return delegate.getClientInfoProviderImpl();
+    }
+
+    public Statement createStatement() throws SQLException {
+        return new StatementWrapper(delegate.createStatement(), 
connectionInfo);
+    }
+
+    public PreparedStatement prepareStatement(String sql) throws SQLException {
+        return new PreparedStatementWrapper(delegate.prepareStatement(sql), 
connectionInfo, sql);
+    }
+
+    public CallableStatement prepareCall(String sql) throws SQLException {
+        return new CallableStatementWrapper(delegate.prepareCall(sql), 
connectionInfo, sql);
+    }
+
+    public String nativeSQL(String sql) throws SQLException {
+        return delegate.nativeSQL(sql);
+    }
+
+    public void setAutoCommit(boolean autoCommit) throws SQLException {
+        delegate.setAutoCommit(autoCommit);
+    }
+
+    public boolean getAutoCommit() throws SQLException {
+        return delegate.getAutoCommit();
+    }
+
+    public void commit() throws SQLException {
+        delegate.commit();
+    }
+
+    public void rollback() throws SQLException {
+        delegate.rollback();
+    }
+
+    public void close() throws SQLException {
+        delegate.close();
+    }
+
+    public boolean isClosed() throws SQLException {
+        return delegate.isClosed();
+    }
+
+    public DatabaseMetaData getMetaData() throws SQLException {
+        return delegate.getMetaData();
+    }
+
+    public void setReadOnly(boolean readOnly) throws SQLException {
+        delegate.setReadOnly(readOnly);
+    }
+
+    public boolean isReadOnly() throws SQLException {
+        return delegate.isReadOnly();
+    }
+
+    public void setCatalog(String catalog) throws SQLException {
+        delegate.setCatalog(catalog);
+    }
+
+    public String getCatalog() throws SQLException {
+        return delegate.getCatalog();
+    }
+
+    public void setTransactionIsolation(int level) throws SQLException {
+        delegate.setTransactionIsolation(level);
+    }
+
+    public int getTransactionIsolation() throws SQLException {
+        return delegate.getTransactionIsolation();
+    }
+
+    public SQLWarning getWarnings() throws SQLException {
+        return delegate.getWarnings();
+    }
+
+    public void clearWarnings() throws SQLException {
+        delegate.clearWarnings();
+    }
+
+    public Statement createStatement(int resultSetType, int 
resultSetConcurrency) throws SQLException {
+        return new StatementWrapper(delegate.createStatement(resultSetType, 
resultSetConcurrency), connectionInfo);
+    }
+
+    public PreparedStatement prepareStatement(String sql, int resultSetType,
+        int resultSetConcurrency) throws SQLException {
+        return new PreparedStatementWrapper(delegate.prepareStatement(sql, 
resultSetType, resultSetConcurrency), connectionInfo, sql);
+    }
+
+    public CallableStatement prepareCall(String sql, int resultSetType, int 
resultSetConcurrency) throws SQLException {
+        return new CallableStatementWrapper(delegate.prepareCall(sql, 
resultSetType, resultSetConcurrency), connectionInfo, sql);
+    }
+
+    public Map<String, Class<?>> getTypeMap() throws SQLException {
+        return delegate.getTypeMap();
+    }
+
+    public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+        delegate.setTypeMap(map);
+    }
+
+    public void setHoldability(int holdability) throws SQLException {
+        delegate.setHoldability(holdability);
+    }
+
+    public int getHoldability() throws SQLException {
+        return delegate.getHoldability();
+    }
+
+    public Savepoint setSavepoint() throws SQLException {
+        return delegate.setSavepoint();
+    }
+
+    public Savepoint setSavepoint(String name) throws SQLException {
+        return delegate.setSavepoint(name);
+    }
+
+    public void rollback(Savepoint savepoint) throws SQLException {
+        delegate.rollback(savepoint);
+    }
+
+    public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+        delegate.releaseSavepoint(savepoint);
+    }
+
+    public Statement createStatement(int resultSetType, int 
resultSetConcurrency,
+        int resultSetHoldability) throws SQLException {
+        return new StatementWrapper(delegate.createStatement(resultSetType, 
resultSetConcurrency, resultSetHoldability), connectionInfo);
+    }
+
+    public PreparedStatement prepareStatement(String sql, int resultSetType, 
int resultSetConcurrency,
+        int resultSetHoldability) throws SQLException {
+        return new PreparedStatementWrapper(delegate.prepareStatement(sql, 
resultSetType, resultSetConcurrency, resultSetHoldability), connectionInfo, 
sql);
+    }
+
+    public CallableStatement prepareCall(String sql, int resultSetType, int 
resultSetConcurrency,
+        int resultSetHoldability) throws SQLException {
+        return new CallableStatementWrapper(delegate.prepareCall(sql, 
resultSetType, resultSetConcurrency, resultSetHoldability), connectionInfo, 
sql);
+    }
+
+    public PreparedStatement prepareStatement(String sql, int 
autoGeneratedKeys) throws SQLException {
+        return new PreparedStatementWrapper(delegate.prepareStatement(sql, 
autoGeneratedKeys), connectionInfo, sql);
+    }
+
+    public PreparedStatement prepareStatement(String sql, int[] columnIndexes) 
throws SQLException {
+        return new PreparedStatementWrapper(delegate.prepareStatement(sql, 
columnIndexes), connectionInfo, sql);
+    }
+
+    public PreparedStatement prepareStatement(String sql, String[] 
columnNames) throws SQLException {
+        return new PreparedStatementWrapper(delegate.prepareStatement(sql, 
columnNames), connectionInfo, sql);
+    }
+
+    public Clob createClob() throws SQLException {
+        return delegate.createClob();
+    }
+
+    public Blob createBlob() throws SQLException {
+        return delegate.createBlob();
+    }
+
+    public NClob createNClob() throws SQLException {
+        return delegate.createNClob();
+    }
+
+    public SQLXML createSQLXML() throws SQLException {
+        return delegate.createSQLXML();
+    }
+
+    public boolean isValid(int timeout) throws SQLException {
+        return delegate.isValid(timeout);
+    }
+
+    public void setClientInfo(String name, String value) throws 
SQLClientInfoException {
+        delegate.setClientInfo(name, value);
+    }
+
+    public void setClientInfo(Properties properties) throws 
SQLClientInfoException {
+        delegate.setClientInfo(properties);
+    }
+
+    public String getClientInfo(String name) throws SQLException {
+        return delegate.getClientInfo(name);
+    }
+
+    public Properties getClientInfo() throws SQLException {
+        return delegate.getClientInfo();
+    }
+
+    public Array createArrayOf(String typeName, Object[] elements) throws 
SQLException {
+        return delegate.createArrayOf(typeName, elements);
+    }
+
+    public Struct createStruct(String typeName, Object[] attributes) throws 
SQLException {
+        return delegate.createStruct(typeName, attributes);
+    }
+
+    public String getSchema() throws SQLException {
+        return delegate.getSchema();
+    }
+
+    public void abort(Executor executor) throws SQLException {
+        delegate.abort(executor);
+    }
+
+    public void setNetworkTimeout(Executor executor, int milliseconds) throws 
SQLException {
+        delegate.setNetworkTimeout(executor, milliseconds);
+    }
+
+    public int getNetworkTimeout() throws SQLException {
+        return delegate.getNetworkTimeout();
+    }
+
+    public <T> T unwrap(Class<T> iface) throws SQLException {
+        return delegate.unwrap(iface);
+    }
+
+    public boolean isWrapperFor(Class<?> iface) throws SQLException {
+        return delegate.isWrapperFor(iface);
+    }
+
+    public void createNewIO(boolean b) {
+        delegate.createNewIO(b);
+    }
+
+    public long getId() {
+        return delegate.getId();
+    }
+
+    public Properties getProperties() {
+        return delegate.getProperties();
+    }
+
+    public String getProcessHost() {
+        return delegate.getProcessHost();
+    }
+
+    public Object getConnectionMutex() {
+        return delegate.getConnectionMutex();
+    }
+
+    public String getURL() {
+        return delegate.getURL();
+    }
+
+    public String getUser() {
+        return delegate.getUser();
+    }
+
+    public ExceptionInterceptor getExceptionInterceptor() {
+        return delegate.getExceptionInterceptor();
+    }
+
+    private final JdbcConnection delegate;
+    private final ConnectionInfo connectionInfo;
+    private Object dynamicField;
+
+    @Override
+    public Object getSkyWalkingDynamicField() {
+        return dynamicField;
+    }
+
+    @Override
+    public void setSkyWalkingDynamicField(Object value) {
+        this.dynamicField = value;
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/LoadBalancedConnectionWrapper.java
 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/LoadBalancedConnectionWrapper.java
new file mode 100644
index 000000000..5263e45dd
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/LoadBalancedConnectionWrapper.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql.wrapper;
+
+import com.mysql.cj.api.jdbc.ha.LoadBalancedConnection;
+import java.sql.SQLException;
+import org.apache.skywalking.apm.plugin.jdbc.trace.ConnectionInfo;
+
+public class LoadBalancedConnectionWrapper extends JdbcConnectionWrapper 
implements LoadBalancedConnection {
+
+    @Override
+    public boolean addHost(String s) throws SQLException {
+        return delegate.addHost(s);
+    }
+
+    @Override public void removeHost(String s) throws SQLException {
+        delegate.removeHost(s);
+    }
+
+    @Override public void removeHostWhenNotInUse(String s) throws SQLException 
{
+        delegate.removeHostWhenNotInUse(s);
+    }
+
+    @Override public void ping(boolean b) throws SQLException {
+        delegate.ping(b);
+    }
+
+    private LoadBalancedConnection delegate;
+
+    public LoadBalancedConnectionWrapper(LoadBalancedConnection delegate, 
ConnectionInfo info) {
+        super(delegate, info);
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/PreparedStatementWrapper.java
 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/PreparedStatementWrapper.java
new file mode 100644
index 000000000..9854c86b0
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/PreparedStatementWrapper.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql.wrapper;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.Date;
+import java.sql.NClob;
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.Ref;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.RowId;
+import java.sql.SQLException;
+import java.sql.SQLType;
+import java.sql.SQLXML;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Calendar;
+import org.apache.skywalking.apm.plugin.jdbc.trace.ConnectionInfo;
+
+public class PreparedStatementWrapper extends StatementWrapper implements 
PreparedStatement {
+
+    public PreparedStatementWrapper(PreparedStatement statement, 
ConnectionInfo connectionInfo, String sql,
+        String statementType) {
+        super(statement, connectionInfo, statementType);
+        this.statement = statement;
+        this.sql = sql;
+    }
+
+    public PreparedStatementWrapper(PreparedStatement statement, 
ConnectionInfo connectionInfo, String sql) {
+        this(statement, connectionInfo, sql, "PreparedStatement");
+    }
+
+    @Override public ResultSet executeQuery() throws SQLException {
+        return TracingUtils.trace(connectionInfo, "executeQuery", sql, 
stateType, new TracingUtils.Executable<ResultSet>() {
+            @Override public ResultSet exe(String sql) throws SQLException {
+                return statement.executeQuery();
+            }
+        });
+    }
+
+    @Override public int executeUpdate() throws SQLException {
+        return TracingUtils.trace(connectionInfo, "executeUpdate", sql, 
stateType, new TracingUtils.Executable<Integer>() {
+            @Override public Integer exe(String sql) throws SQLException {
+                return statement.executeUpdate();
+            }
+        });
+    }
+
+    @Override public void setNull(int parameterIndex, int sqlType) throws 
SQLException {
+        statement.setNull(parameterIndex, sqlType);
+    }
+
+    @Override public void setBoolean(int parameterIndex, boolean x) throws 
SQLException {
+        statement.setBoolean(parameterIndex, x);
+    }
+
+    @Override public void setByte(int parameterIndex, byte x) throws 
SQLException {
+        statement.setByte(parameterIndex, x);
+    }
+
+    @Override public void setShort(int parameterIndex, short x) throws 
SQLException {
+        statement.setShort(parameterIndex, x);
+    }
+
+    @Override public void setInt(int parameterIndex, int x) throws 
SQLException {
+        statement.setInt(parameterIndex, x);
+    }
+
+    @Override public void setLong(int parameterIndex, long x) throws 
SQLException {
+        statement.setLong(parameterIndex, x);
+    }
+
+    @Override public void setFloat(int parameterIndex, float x) throws 
SQLException {
+        statement.setFloat(parameterIndex, x);
+    }
+
+    @Override public void setDouble(int parameterIndex, double x) throws 
SQLException {
+        statement.setDouble(parameterIndex, x);
+    }
+
+    @Override public void setBigDecimal(int parameterIndex, BigDecimal x) 
throws SQLException {
+        statement.setBigDecimal(parameterIndex, x);
+    }
+
+    @Override public void setString(int parameterIndex, String x) throws 
SQLException {
+        statement.setString(parameterIndex, x);
+    }
+
+    @Override public void setBytes(int parameterIndex, byte[] x) throws 
SQLException {
+        statement.setBytes(parameterIndex, x);
+    }
+
+    @Override public void setDate(int parameterIndex, Date x) throws 
SQLException {
+        statement.setDate(parameterIndex, x);
+    }
+
+    @Override public void setTime(int parameterIndex, Time x) throws 
SQLException {
+        statement.setTime(parameterIndex, x);
+    }
+
+    @Override public void setTimestamp(int parameterIndex, Timestamp x) throws 
SQLException {
+        statement.setTimestamp(parameterIndex, x);
+    }
+
+    @Override public void setAsciiStream(int parameterIndex, InputStream x, 
int length) throws SQLException {
+        statement.setAsciiStream(parameterIndex, x, length);
+    }
+
+    @Override @Deprecated
+    public void setUnicodeStream(int parameterIndex, InputStream x, int 
length) throws SQLException {
+        statement.setUnicodeStream(parameterIndex, x, length);
+    }
+
+    @Override public void setBinaryStream(int parameterIndex, InputStream x, 
int length) throws SQLException {
+        statement.setBinaryStream(parameterIndex, x, length);
+    }
+
+    @Override public void clearParameters() throws SQLException {
+        statement.clearParameters();
+    }
+
+    @Override public void setObject(int parameterIndex, Object x, int 
targetSqlType) throws SQLException {
+        statement.setObject(parameterIndex, x, targetSqlType);
+    }
+
+    @Override public void setObject(int parameterIndex, Object x) throws 
SQLException {
+        statement.setObject(parameterIndex, x);
+    }
+
+    @Override public boolean execute() throws SQLException {
+        return TracingUtils.trace(connectionInfo, "execute", sql, stateType, 
new TracingUtils.Executable<Boolean>() {
+            @Override public Boolean exe(String sql) throws SQLException {
+                return statement.execute(sql);
+            }
+        });
+    }
+
+    @Override public void addBatch() throws SQLException {
+        statement.addBatch();
+    }
+
+    @Override public void setCharacterStream(int parameterIndex, Reader 
reader, int length) throws SQLException {
+        statement.setCharacterStream(parameterIndex, reader, length);
+    }
+
+    @Override public void setRef(int parameterIndex, Ref x) throws 
SQLException {
+        statement.setRef(parameterIndex, x);
+    }
+
+    @Override public void setBlob(int parameterIndex, Blob x) throws 
SQLException {
+        statement.setBlob(parameterIndex, x);
+    }
+
+    @Override public void setClob(int parameterIndex, Clob x) throws 
SQLException {
+        statement.setClob(parameterIndex, x);
+    }
+
+    @Override public void setArray(int parameterIndex, Array x) throws 
SQLException {
+        statement.setArray(parameterIndex, x);
+    }
+
+    @Override public ResultSetMetaData getMetaData() throws SQLException {
+        return statement.getMetaData();
+    }
+
+    @Override public void setDate(int parameterIndex, Date x, Calendar cal) 
throws SQLException {
+        statement.setDate(parameterIndex, x, cal);
+    }
+
+    @Override public void setTime(int parameterIndex, Time x, Calendar cal) 
throws SQLException {
+        statement.setTime(parameterIndex, x, cal);
+    }
+
+    @Override public void setTimestamp(int parameterIndex, Timestamp x, 
Calendar cal) throws SQLException {
+        statement.setTimestamp(parameterIndex, x, cal);
+    }
+
+    @Override public void setNull(int parameterIndex, int sqlType, String 
typeName) throws SQLException {
+        statement.setNull(parameterIndex, sqlType, typeName);
+    }
+
+    @Override public void setURL(int parameterIndex, URL x) throws 
SQLException {
+        statement.setURL(parameterIndex, x);
+    }
+
+    @Override public ParameterMetaData getParameterMetaData() throws 
SQLException {
+        return statement.getParameterMetaData();
+    }
+
+    @Override public void setRowId(int parameterIndex, RowId x) throws 
SQLException {
+        statement.setRowId(parameterIndex, x);
+    }
+
+    @Override public void setNString(int parameterIndex, String value) throws 
SQLException {
+        statement.setNString(parameterIndex, value);
+    }
+
+    @Override public void setNCharacterStream(int parameterIndex, Reader 
value, long length) throws SQLException {
+        statement.setNCharacterStream(parameterIndex, value, length);
+    }
+
+    @Override public void setNClob(int parameterIndex, NClob value) throws 
SQLException {
+        statement.setNClob(parameterIndex, value);
+    }
+
+    @Override public void setClob(int parameterIndex, Reader reader, long 
length) throws SQLException {
+        statement.setClob(parameterIndex, reader, length);
+    }
+
+    @Override public void setBlob(int parameterIndex, InputStream inputStream, 
long length) throws SQLException {
+        statement.setBlob(parameterIndex, inputStream, length);
+    }
+
+    @Override public void setNClob(int parameterIndex, Reader reader, long 
length) throws SQLException {
+        statement.setNClob(parameterIndex, reader, length);
+    }
+
+    @Override public void setSQLXML(int parameterIndex, SQLXML xmlObject) 
throws SQLException {
+        statement.setSQLXML(parameterIndex, xmlObject);
+    }
+
+    @Override
+    public void setObject(int parameterIndex, Object x, int targetSqlType, int 
scaleOrLength) throws SQLException {
+        statement.setObject(parameterIndex, x, targetSqlType, scaleOrLength);
+    }
+
+    @Override public void setAsciiStream(int parameterIndex, InputStream x, 
long length) throws SQLException {
+        statement.setAsciiStream(parameterIndex, x, length);
+    }
+
+    @Override public void setBinaryStream(int parameterIndex, InputStream x, 
long length) throws SQLException {
+        statement.setBinaryStream(parameterIndex, x, length);
+    }
+
+    @Override public void setCharacterStream(int parameterIndex, Reader 
reader, long length) throws SQLException {
+        statement.setCharacterStream(parameterIndex, reader, length);
+    }
+
+    @Override public void setAsciiStream(int parameterIndex, InputStream x) 
throws SQLException {
+        statement.setAsciiStream(parameterIndex, x);
+    }
+
+    @Override public void setBinaryStream(int parameterIndex, InputStream x) 
throws SQLException {
+        statement.setBinaryStream(parameterIndex, x);
+    }
+
+    @Override public void setCharacterStream(int parameterIndex, Reader 
reader) throws SQLException {
+        statement.setCharacterStream(parameterIndex, reader);
+    }
+
+    @Override public void setNCharacterStream(int parameterIndex, Reader 
value) throws SQLException {
+        statement.setNCharacterStream(parameterIndex, value);
+    }
+
+    @Override public void setClob(int parameterIndex, Reader reader) throws 
SQLException {
+        statement.setClob(parameterIndex, reader);
+    }
+
+    @Override public void setBlob(int parameterIndex, InputStream inputStream) 
throws SQLException {
+        statement.setBlob(parameterIndex, inputStream);
+    }
+
+    @Override public void setNClob(int parameterIndex, Reader reader) throws 
SQLException {
+        statement.setNClob(parameterIndex, reader);
+    }
+
+    @Override
+    public void setObject(int parameterIndex, Object x, SQLType targetSqlType, 
int scaleOrLength) throws SQLException {
+        statement.setObject(parameterIndex, x, targetSqlType, scaleOrLength);
+    }
+
+    @Override
+    public void setObject(int parameterIndex, Object x, SQLType targetSqlType) 
throws SQLException {
+        statement.setObject(parameterIndex, x, targetSqlType);
+    }
+
+    private final PreparedStatement statement;
+    private final String sql;
+
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/ReplicationConnectionWrapper.java
 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/ReplicationConnectionWrapper.java
new file mode 100644
index 000000000..2b8fa00be
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/ReplicationConnectionWrapper.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql.wrapper;
+
+import com.mysql.cj.api.jdbc.JdbcConnection;
+import com.mysql.cj.api.jdbc.ha.ReplicationConnection;
+import java.sql.SQLException;
+import org.apache.skywalking.apm.plugin.jdbc.trace.ConnectionInfo;
+
+public class ReplicationConnectionWrapper extends JdbcConnectionWrapper 
implements ReplicationConnection {
+
+    public ReplicationConnectionWrapper(JdbcConnection delegate, 
ConnectionInfo connectionInfo) {
+        super(delegate, connectionInfo);
+    }
+
+    @Override public long getConnectionGroupId() {
+        return replicationConnection.getConnectionGroupId();
+    }
+
+    @Override public JdbcConnection getCurrentConnection() {
+        return replicationConnection.getCurrentConnection();
+    }
+
+    @Override public JdbcConnection getMasterConnection() {
+        return replicationConnection.getMasterConnection();
+    }
+
+    @Override public void promoteSlaveToMaster(String s) throws SQLException {
+        replicationConnection.promoteSlaveToMaster(s);
+    }
+
+    @Override public void removeMasterHost(String s) throws SQLException {
+        replicationConnection.removeMasterHost(s);
+    }
+
+    @Override public void removeMasterHost(String s, boolean b) throws 
SQLException {
+        replicationConnection.removeMasterHost(s, b);
+    }
+
+    @Override public boolean isHostMaster(String s) {
+        return replicationConnection.isHostMaster(s);
+    }
+
+    @Override public JdbcConnection getSlavesConnection() {
+        return replicationConnection.getSlavesConnection();
+    }
+
+    @Override public void addSlaveHost(String s) throws SQLException {
+        replicationConnection.addSlaveHost(s);
+    }
+
+    @Override public void removeSlave(String s) throws SQLException {
+        replicationConnection.removeSlave(s);
+    }
+
+    @Override public void removeSlave(String s, boolean b) throws SQLException 
{
+        replicationConnection.removeSlave(s, b);
+    }
+
+    @Override public boolean isHostSlave(String s) {
+        return replicationConnection.isHostSlave(s);
+    }
+
+    private ReplicationConnection replicationConnection;
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/StatementWrapper.java
 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/StatementWrapper.java
new file mode 100644
index 000000000..1226b5f1b
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/StatementWrapper.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql.wrapper;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.Statement;
+import org.apache.skywalking.apm.plugin.jdbc.trace.ConnectionInfo;
+
+public class StatementWrapper implements Statement {
+
+    @Override
+    public ResultSet executeQuery(String sql) throws SQLException {
+        return TracingUtils.trace(connectionInfo, "executeQuery", sql, 
stateType, new TracingUtils.Executable<ResultSet>() {
+            @Override public ResultSet exe(String sql) throws SQLException {
+                return statement.executeQuery(sql);
+            }
+        });
+    }
+
+    @Override public int executeUpdate(String sql) throws SQLException {
+        return TracingUtils.trace(connectionInfo, "executeUpdate", sql, 
stateType, new TracingUtils.Executable<Integer>() {
+            @Override public Integer exe(String sql) throws SQLException {
+                return statement.executeUpdate(sql);
+            }
+        });
+    }
+
+    @Override public void close() throws SQLException {
+        statement.close();
+    }
+
+    @Override public int getMaxFieldSize() throws SQLException {
+        return statement.getMaxFieldSize();
+    }
+
+    @Override public void setMaxFieldSize(int max) throws SQLException {
+        statement.setMaxFieldSize(max);
+    }
+
+    @Override public int getMaxRows() throws SQLException {
+        return statement.getMaxRows();
+    }
+
+    @Override public void setMaxRows(int max) throws SQLException {
+        statement.setMaxRows(max);
+    }
+
+    @Override public void setEscapeProcessing(boolean enable) throws 
SQLException {
+        statement.setEscapeProcessing(enable);
+    }
+
+    @Override public int getQueryTimeout() throws SQLException {
+        return statement.getQueryTimeout();
+    }
+
+    @Override public void setQueryTimeout(int seconds) throws SQLException {
+        statement.setQueryTimeout(seconds);
+    }
+
+    @Override public void cancel() throws SQLException {
+        statement.cancel();
+    }
+
+    @Override public SQLWarning getWarnings() throws SQLException {
+        return statement.getWarnings();
+    }
+
+    @Override public void clearWarnings() throws SQLException {
+        statement.clearWarnings();
+    }
+
+    @Override public void setCursorName(String name) throws SQLException {
+        statement.setCursorName(name);
+    }
+
+    @Override public boolean execute(String sql) throws SQLException {
+        return TracingUtils.trace(connectionInfo, "execute", sql, stateType, 
new TracingUtils.Executable<Boolean>() {
+            @Override public Boolean exe(String sql) throws SQLException {
+                return statement.execute(sql);
+            }
+        });
+    }
+
+    @Override public ResultSet getResultSet() throws SQLException {
+        return statement.getResultSet();
+    }
+
+    @Override public int getUpdateCount() throws SQLException {
+        return statement.getUpdateCount();
+    }
+
+    @Override public boolean getMoreResults() throws SQLException {
+        return statement.getMoreResults();
+    }
+
+    @Override public void setFetchDirection(int direction) throws SQLException 
{
+        statement.setFetchDirection(direction);
+    }
+
+    @Override public int getFetchDirection() throws SQLException {
+        return statement.getFetchDirection();
+    }
+
+    @Override public void setFetchSize(int rows) throws SQLException {
+        statement.setFetchSize(rows);
+    }
+
+    @Override public int getFetchSize() throws SQLException {
+        return statement.getFetchSize();
+    }
+
+    @Override public int getResultSetConcurrency() throws SQLException {
+        return statement.getResultSetConcurrency();
+    }
+
+    @Override public int getResultSetType() throws SQLException {
+        return statement.getResultSetType();
+    }
+
+    @Override public void addBatch(String sql) throws SQLException {
+        statement.addBatch(sql);
+    }
+
+    @Override public void clearBatch() throws SQLException {
+        statement.clearBatch();
+    }
+
+    @Override public int[] executeBatch() throws SQLException {
+        return TracingUtils.trace(connectionInfo, "executeBatch", null, 
stateType, new TracingUtils.Executable<int[]>() {
+            @Override public int[] exe(String sql) throws SQLException {
+                return statement.executeBatch();
+            }
+        });
+    }
+
+    @Override public Connection getConnection() throws SQLException {
+        return statement.getConnection();
+    }
+
+    @Override public boolean getMoreResults(int current) throws SQLException {
+        return statement.getMoreResults(current);
+    }
+
+    @Override public ResultSet getGeneratedKeys() throws SQLException {
+        return statement.getGeneratedKeys();
+    }
+
+    @Override public int executeUpdate(String sql, final int 
autoGeneratedKeys) throws SQLException {
+        return TracingUtils.trace(connectionInfo, "executeUpdate", sql, 
stateType, new TracingUtils.Executable<Integer>() {
+            @Override public Integer exe(String sql) throws SQLException {
+                return statement.executeUpdate(sql, autoGeneratedKeys);
+            }
+        });
+    }
+
+    @Override public int executeUpdate(String sql, final int[] columnIndexes) 
throws SQLException {
+        return TracingUtils.trace(connectionInfo, "executeUpdate", sql, 
stateType, new TracingUtils.Executable<Integer>() {
+            @Override public Integer exe(String sql) throws SQLException {
+                return statement.executeUpdate(sql, columnIndexes);
+            }
+        });
+    }
+
+    @Override public int executeUpdate(String sql, final String[] columnNames) 
throws SQLException {
+        return TracingUtils.trace(connectionInfo, "executeUpdate", sql, 
stateType, new TracingUtils.Executable<Integer>() {
+            @Override public Integer exe(String sql) throws SQLException {
+                return statement.executeUpdate(sql, columnNames);
+            }
+        });
+    }
+
+    @Override public boolean execute(String sql, final int autoGeneratedKeys) 
throws SQLException {
+        return TracingUtils.trace(connectionInfo, "execute", sql, stateType, 
new TracingUtils.Executable<Boolean>() {
+            @Override public Boolean exe(String sql) throws SQLException {
+                return statement.execute(sql, autoGeneratedKeys);
+            }
+        });
+    }
+
+    @Override public boolean execute(String sql, final int[] columnIndexes) 
throws SQLException {
+        return TracingUtils.trace(connectionInfo, "execute", sql, stateType, 
new TracingUtils.Executable<Boolean>() {
+            @Override public Boolean exe(String sql) throws SQLException {
+                return statement.execute(sql, columnIndexes);
+            }
+        });
+    }
+
+    @Override public boolean execute(String sql, final String[] columnNames) 
throws SQLException {
+        return TracingUtils.trace(connectionInfo, "execute", sql, stateType, 
new TracingUtils.Executable<Boolean>() {
+            @Override public Boolean exe(String sql) throws SQLException {
+                return statement.execute(sql, columnNames);
+            }
+        });
+    }
+
+    @Override public int getResultSetHoldability() throws SQLException {
+        return statement.getResultSetHoldability();
+    }
+
+    @Override public boolean isClosed() throws SQLException {
+        return statement.isClosed();
+    }
+
+    @Override public void setPoolable(boolean poolable) throws SQLException {
+        statement.setPoolable(poolable);
+    }
+
+    @Override public boolean isPoolable() throws SQLException {
+        return statement.isPoolable();
+    }
+
+    @Override public void closeOnCompletion() throws SQLException {
+        statement.closeOnCompletion();
+    }
+
+    @Override public boolean isCloseOnCompletion() throws SQLException {
+        return statement.isCloseOnCompletion();
+    }
+
+    @Override public long getLargeUpdateCount() throws SQLException {
+        return statement.getLargeUpdateCount();
+    }
+
+    @Override public void setLargeMaxRows(long max) throws SQLException {
+        statement.setLargeMaxRows(max);
+    }
+
+    @Override public long getLargeMaxRows() throws SQLException {
+        return statement.getLargeMaxRows();
+    }
+
+    @Override public long[] executeLargeBatch() throws SQLException {
+        return statement.executeLargeBatch();
+    }
+
+    @Override public long executeLargeUpdate(String sql) throws SQLException {
+        return TracingUtils.trace(connectionInfo, "executeLargeUpdate", sql, 
stateType, new TracingUtils.Executable<Long>() {
+            @Override public Long exe(String sql) throws SQLException {
+                return statement.executeLargeUpdate(sql);
+            }
+        });
+    }
+
+    @Override public long executeLargeUpdate(String sql, final int 
autoGeneratedKeys) throws SQLException {
+        return TracingUtils.trace(connectionInfo, "executeLargeUpdate", sql, 
stateType, new TracingUtils.Executable<Long>() {
+            @Override public Long exe(String sql) throws SQLException {
+                return statement.executeLargeUpdate(sql, autoGeneratedKeys);
+            }
+        });
+    }
+
+    @Override public long executeLargeUpdate(String sql, final int[] 
columnIndexes) throws SQLException {
+        return TracingUtils.trace(connectionInfo, "executeLargeUpdate", sql, 
stateType, new TracingUtils.Executable<Long>() {
+            @Override public Long exe(String sql) throws SQLException {
+                return statement.executeLargeUpdate(sql, columnIndexes);
+            }
+        });
+    }
+
+    @Override public long executeLargeUpdate(String sql, final String[] 
columnNames) throws SQLException {
+        return TracingUtils.trace(connectionInfo, "executeLargeUpdate", sql, 
stateType, new TracingUtils.Executable<Long>() {
+            @Override public Long exe(String sql) throws SQLException {
+                return statement.executeLargeUpdate(sql, columnNames);
+            }
+        });
+    }
+
+    private final Statement statement;
+    protected final ConnectionInfo connectionInfo;
+    protected final String stateType;
+
+    public StatementWrapper(Statement statement, ConnectionInfo 
connectionInfo, String stateType) {
+        this.statement = statement;
+        this.connectionInfo = connectionInfo;
+        this.stateType = stateType;
+    }
+
+    public StatementWrapper(Statement statement, ConnectionInfo 
connectionInfo) {
+        this(statement, connectionInfo, "Statement");
+    }
+
+    @Override public <T> T unwrap(Class<T> iface) throws SQLException {
+        return statement.unwrap(iface);
+    }
+
+    @Override public boolean isWrapperFor(Class<?> iface) throws SQLException {
+        return statement.isWrapperFor(iface);
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/TracingUtils.java
 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/TracingUtils.java
new file mode 100644
index 000000000..9c6add1cc
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/TracingUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql.wrapper;
+
+import java.sql.SQLException;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.plugin.jdbc.trace.ConnectionInfo;
+
+public class TracingUtils {
+
+    public static <R> R trace(ConnectionInfo connectInfo, String method, 
String sql, String statementType,
+        TracingUtils.Executable<R> exec)
+        throws SQLException {
+        try {
+            AbstractSpan span = 
ContextManager.createExitSpan(connectInfo.getDBType() + "/JDBI/" + 
statementType + "/" + method, connectInfo.getDatabasePeer());
+            Tags.DB_TYPE.set(span, "sql");
+            Tags.DB_INSTANCE.set(span, connectInfo.getDatabaseName());
+            Tags.DB_STATEMENT.set(span, sql);
+            span.setComponent(connectInfo.getComponent());
+            SpanLayer.asDB(span);
+            return exec.exe(sql);
+        } catch (SQLException e) {
+            AbstractSpan span = ContextManager.activeSpan();
+            span.errorOccurred();
+            span.log(e);
+            throw e;
+        } finally {
+            ContextManager.stopSpan();
+        }
+    }
+
+    public interface Executable<R> {
+        R exe(String sql) throws SQLException;
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/resources/skywalking-plugin.def
 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/resources/skywalking-plugin.def
index 65c38ead8..ee872d248 100644
--- 
a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/resources/skywalking-plugin.def
+++ 
b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/resources/skywalking-plugin.def
@@ -20,3 +20,6 @@ 
mysql-5.x=org.apache.skywalking.apm.plugin.jdbc.mysql.define.Mysql50ConnectionIn
 
mysql-5.x=org.apache.skywalking.apm.plugin.jdbc.mysql.define.CallableInstrumentation
 
mysql-5.x=org.apache.skywalking.apm.plugin.jdbc.mysql.define.PreparedStatementInstrumentation
 
mysql-5.x=org.apache.skywalking.apm.plugin.jdbc.mysql.define.StatementInstrumentation
+mysql-5.x=org.apache.skywalking.apm.plugin.jdbc.mysql.define.FailoverConnectionProxyInstrumentation
+mysql-5.x=org.apache.skywalking.apm.plugin.jdbc.mysql.define.LoadBalancedConnectionProxyInstrumentation
+mysql-5.x=org.apache.skywalking.apm.plugin.jdbc.mysql.define.ReplicationConnectionProxyInstrumentation


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to